Nopea katsaus Spark Structured Streamingiin + Kafkaan

Opi perusteet tämän tehokkaan kaksikon käyttämisestä stream-käsittelytehtäviin

Kuva Nikhita Singhal on Unsplash

Aloin hiljattain opiskella paljon Apache Kafkasta ja Apache Sparkista, jotka ovat kaksi johtavaa teknologiaa tietotekniikan maailmassa.

Olen tehnyt useita projekteja käyttäen niitä viime kuukausina; "Koneoppimisen suoratoisto Kafkan, Debeziumin ja BentoML:n kanssa” on esimerkki. Keskityn oppimaan luomaan tehokkaita dataputkia näillä nykyaikaisilla kuuluisilla työkaluilla ja ymmärtämään niiden edut ja haitat.

Viime kuukausina olen jo käsitellyt ETL-putkien luomista molemmilla työkaluilla, mutta en koskaan käyttämättä niitä yhdessä, ja tämä on aukko, jonka täytän tänään.

Tavoitteenamme on oppia Spark+Kafkan suoratoistosovelluksen rakentamisen yleinen idea ja antaa nopea katsaus sen pääkonsepteihin todellisen datan avulla.

Idea on yksinkertainen – Apache Kafka on viestien suoratoistotyökalu, jossa tuottajat kirjoittavat viestejä jonon toiseen päähän (ns. aihe) toisaalta kuluttajien luettavaksi.

Mutta se on erittäin monimutkainen työkalu, joka on rakennettu kestäväksi hajautettuun viestintäpalveluun, jossa on kaikenlaiset toimitustakuut (tarkan kerran, kerran, mikä tahansa), viestien tallennus ja viestien replikointi, samalla kun se mahdollistaa joustavuuden, skaalautuvuuden ja suuren suorituskyvyn. Sillä on laajempi joukko käyttötapauksia, kuten mikropalveluviestintä, reaaliaikaiset tapahtumajärjestelmät ja suoratoisto-ETL-putkistot.

Apache Spark on hajautettu muistipohjainen tiedonmuunnosmoottori.

Se on myös erittäin monimutkainen työkalu, joka pystyy muodostamaan yhteyden kaikenlaisiin tietokantoihin, tiedostojärjestelmiin ja pilviinfrastruktuuriin. Se on suunniteltu toimimaan hajautetuissa ympäristöissä rinnakkaisemaan koneiden välistä prosessointia ja saavuttamaan korkean suorituskyvyn muunnoksia käyttämällä laiska arviointifilosofiaa ja kyselyn optimointia.

Hienoa siinä on, että päivän päätteeksi koodi on vain tavallinen SQL-kyselysi tai (melkein) Python+pandas-skripti, ja kaikki noituudet on abstraktoitu mukavan käyttäjäystävällisen korkean tason API:n alla.

Yhdistä nämä kaksi tekniikkaa, niin meillä on täydellinen pari rakentaaksesi suoratoiston ETL-putkilinjan.

Käytämme Minas Geraisin (Brasilian) pääkaupungin Belo Horizonten (BH) kaupungin liikenneantureiden tietoja. Se on valtava tietojoukko, joka sisältää liikennevirtojen mittauksia useissa paikoissa kaupungissa. Jokainen anturi havaitsee määräajoin kyseisessä paikassa ajavan ajoneuvon tyypin (auto, moottoripyörä, bussi/kuorma-auto), sen nopeuden ja pituuden (sekä muita tietoja, joita emme aio käyttää).

Tämä tietojoukko edustaa juuri yhtä suoratoistojärjestelmien klassisista sovelluksista – ryhmää antureita, jotka lähettävät lukemiaan jatkuvasti kentältä.

Tässä skenaariossa Apache Kafkaa voidaan käyttää abstraktikerroksena anturien ja niiden tietoja kuluttavien sovellusten välillä.

Kafkaa käytettiin abstraktikerroksena lähteiden ja palveluiden välillä. Kuva tekijältä.

Tällaisella infrastruktuurilla on mahdollista rakentaa kaikenlaisia ​​(ns. reaaliaikaiset tapahtumapohjaiset järjestelmät, kuten ohjelma, joka havaitsee liikenneruuhkat ja varoittaa niistä, kun ajoneuvojen määrä äkillisesti kasvaa keskinopeuden laskeessa.

Ja siinä Apache Spark tulee peliin.

Siinä on natiivi moduuli streamin käsittelyä varten Spark Structured Streaming, joka voi muodostaa yhteyden Kafkaan ja käsitellä sen viestejä.

Ympäristön asettaminen

Tarvitset vain telakointiaseman ja telakointiaseman.

Käytämme Docker-Compose-tiedostokokoonpanoa, joka perustuu seuraaviin arkistoihin: linkin kipinä, linkki kafka.

- ./src volyymi on paikka, johon laitamme käsikirjoituksemme.

Käynnistä ympäristö vain juoksemalla

docker-säveltää ylös

Kaikki koodit löytyvät tästä GitHub-arkisto.

Yksi asioista, joista pidin eniten aloittaessani Sparkin opiskelun, oli sen kirjoitetun koodin ja tavanomaisten python+pandas-skriptien samankaltaisuus. Muutto oli erittäin helppoa.

Samaa logiikkaa noudattaen Sparkin suoratoistomoduuli on hyvin samankaltainen kuin tavallinen kipinäkoodi, mikä helpottaa siirtymistä eräsovelluksista stream-sovelluksiin.

Seuraavissa osioissa keskitymme kuitenkin Spark-strukturoidun suoratoiston erityispiirteisiin eli siihen, mitä uusia ominaisuuksia siinä on.

Ensimmäinen työpaikkamme

Aloitetaan hitaasti ja rakennetaan esimerkki lelusta

Ensimmäinen asia on luoda Kafka-aihe, josta kipinätyömme kuluttaa viestejä.

Tämän tekee pääsy Kafkan konttiterminaaliin ja suorittaa:

kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test_topic

Simuloidaksesi tuottajaa kirjoittamassa viestejä tästä aiheesta, käytetään kafka-konsoli-tuottaja. Myös säiliön sisällä:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_topic --property "parse.key=true" --ominaisuus "key.separator=:"

Tästä lähtien jokainen päätteeseen kirjoitettu rivi lähetetään viestinä testiaiheeseen. Merkkiä ":" käytetään erottamaan viestin avaimen ja arvon (avain:arvo).

Luodaan Spark-työ käsittelemään tätä aihetta.

Koodi on laitettava sisään /src/streaming kansio (ei mitään erikoista, vain valitsemani kansio).

Tärkeintä on huomata, että käytämme attribuutteja lue Stream ja writeStream, normaalin lukemisen ja kirjoittamisen sijaan. Tämä on tärkein näkökohta, jonka vuoksi Spark kohtelee työtämme suoratoistosovelluksena.

Jotta voit muodostaa yhteyden Kafkaan, sinun on määritettävä palvelin ja aihe. Vaihtoehto startOffsets=“aikaisintaan” käskee Sparkia lukemaan aiheen alusta. Myös siksi, että Kafka tallentaa viestinsä sisään binaarinen muodossa, ne on dekoodattava jono.

Muita vaihtoehtoja tutkitaan lisää.

Käydään nyt Spark-säilössä ja suoritetaan työ.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/read_test_stream.py

Muutaman sekunnin määrityksen jälkeen se alkaa kuluttaa aihetta.

Kipinä kuluttaa viestejä Kafkasta. Kuva tekijältä.

Spark Streaming toimii mikroerittely -tilassa, ja siksi näemme "erätiedot", kun se kuluttaa viestejä.

Mikroerätoiminto on jossain määrin täyden "todellisen" suoratoiston, jossa kaikki viestit käsitellään yksitellen, kun ne saapuvat, ja tavanomaisen erän välillä, jossa data pysyy staattisena ja kulutetaan tarpeen mukaan. Spark odottaa jonkin aikaa yrittääkseen kerätä viestejä käsitelläkseen ne yhdessä, mikä vähentää yleiskustannuksia ja lisää viivettä. Tämä voidaan säätää tarpeidesi mukaan.

En ole supernopea kirjoittaja, joten Spark käsittelee viestin ennen kuin voin sisällyttää uusia nykyiseen joukkoon.

Ja se oli ensimmäinen suoratoistotyömme!

Toivon, että saat tunteen: streamin käsittelytyötä ei ole vaikea koodata, mutta siinä on joitain hankaluuksia.

Tietojen kirjoittaminen Kafka-streamiin

Nyt on aika alkaa leikkiä anturitiedoilla.

Voit ladata vetoketju tiedosto ELOKUU 2022 ja pura se tiedostoon / data äänenvoimakkuutta. Tiedot ovat alun perin JSON-muodossa ja vievät noin 23 Gt tilaa. Ensimmäinen asia on muuntaa se parketiksi levytilan ja lukuajan optimoimiseksi.

Tämän tekemiseen tarvittavat kipinätyöt on kuvattu yksityiskohtaisesti GitHub-arkistossa, sinun tarvitsee vain suorittaa ne:

spark-submit /src/transform_json_to_parquet.pyspark-submit /src/join_parquet_files.py

Koneesta riippuen suoritus voi kestää jonkin aikaa. Mutta se kannattaa, lopullinen parkettitiedoston koko on ~1Gb (yli 20x pienempi) ja paljon nopeampi lukea.

Meidän on myös luotava Kafka-aihe, jotta voimme vastaanottaa viestimme:

kafka-topics.sh --create --replication-factor 1 --bootstrap-server localhost:9092 --topic traffic_sensor

Valinnaisesti, jos haluat näyttää saapuvat viestit, on mahdollista perustaa konsolikuluttaja.

kafka-console-consumer.sh --topic traffic_sensor --bootstrap-server localhost:9092

Datan kirjoittaminen Kafka-aiheesta on helppoa, mutta siinä on joitain yksityiskohtia.

Strukturoidussa suoratoistossa oletuskäyttäytyminen on, että dataskeemaa (sarakkeita ja niiden tyyppejä) ei yritetä päätellä, joten meidän on välitettävä sellainen.

Kafka-viestit ovat vain avainarvo-binäärimerkkijonopareja, joten meidän on esitettävä tietomme tässä muodossa. Tämä voidaan saavuttaa helposti muuntamalla kaikki rivit JSON-merkkijonoiksi, koodaamalla ne binäärimuodossa ja tallentamalla tulos "arvo"-sarakkeeseen.

Muunnetaan sarakkeita JSON-merkkijonoiksi. Kuva tekijältä.

Viestiavaimet ovat erittäin tärkeitä Kafkassa, mutta niistä ei ole hyötyä testeissämme, joten kaikilla viesteillä on sama.

Kuten aiemmin mainittiin, tämä tietojoukko on valtava, joten rajoitin lisättyjen viestien määrän 500,000 XNUMX:een.

Lopuksi ohitamme Kafka-palvelimen ja aiheen sekä "tarkistuspisteen sijainti”, johon kipinä tallentaa suorituksen edistymisen, mikä on hyödyllistä virheistä toipumisessa.

Työn suorittaminen:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/insert_traffic_topic.py
Tietojen lisääminen Kafkaan. Kuva tekijältä.

Vasemmalla Spark-työ lukee tiedoston, oikealla a kafka-konsoli-kuluttaja näyttää saapuvat viestit.

Liikenneaiheemme on täynnä ja melkein valmis käsiteltäväksi.

On tärkeää muistaa, että käytimme kipinätyötä täyttääksemme aiheemme vain oppimistarkoituksiin. Todellisessa tilanteessa anturit itse lähettävät lukemat suoraan Kafkaan.

Tämän dynaamisen toiminnan simuloimiseksi alla oleva komentosarja kirjoittaa aiheeseen yhden rivin 1 sekunnin välein.

Tulostustilat — Ajoneuvojen määrän laskeminen tyypin mukaan

Jatketaan, luodaan työ, jossa lasketaan ajoneuvojen lukumäärä tyypin mukaan.

Sarake “Classificação” (luokitus) sisältää havaitun ajoneuvotyypin.

Kun luemme aihetta, meidän on muutettava JSON-binaariset merkkijonot takaisin sarakemuotoon.

Kun tämä on tehty, kysely voidaan rakentaa tavalliseen tapaan. On mielenkiintoista huomata, että kyselyn sydän on vain valita().groupBy().laskea() -sekvenssi, kaikki muu on suhteessa suoratoistologiikkaan.

Joten on aika käsitellä outputMode() vaihtoehto.

Stream-sovelluksen tulostustila määrittää, kuinka haluamme laskea (uudelleen) ja kirjoittaa tulokset uuden tiedon saapuessa.

Se voi olettaa kolmea eri arvoa:

  • Liitä: Lisää vain uusia tietueita tuotteeseen.
  • Täydellinen: Laske jokaisen uuden tietueen täydellinen tulos uudelleen.
  • Päivitykset: Päivitä muuttuneet tietueet.

Näillä tiloilla voi olla tai ei ole järkeä kirjoitetusta sovelluksesta riippuen. Esimerkiksi "valmis"-tilassa ei ehkä ole järkeä, jos ryhmittelyä tai lajittelua suoritetaan.

Suoritetaan työ "valmis"-tilassa ja katsotaan tuloksia.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/group_by_vehicle_type.py
Caminhão — Kuorma-auto, Automóvel-Car, Indefinido-Undefined, Ônibus-Bus, Moto-Motocycle. Kuva tekijältä.

Kun uusia tietueita lisätään streamiin (katso pääte oikealla), työ laskee koko tuloksen uudelleen. Tästä voi olla hyötyä tilanteissa, joissa rivien järjestys on tärkeää, kuten sijoituksissa tai kilpailussa.

Tämä lähestymistapa ei kuitenkaan välttämättä ole optimaalinen, jos ryhmien määrä on liian suuri tai yksittäiset muutokset eivät vaikuta kokonaistulokseen.

Joten toinen vaihtoehto on käyttää "päivitys" -tulostustilaa, joka luo uuden viestin vain muuttuneille ryhmille. Katso alempaa:

Kysely, jonka tulostustila on "päivitys". Kuva tekijältä.

"Liitä"-tila ei ole käytettävissä ryhmittelyä sisältäville kyselyille, joten en voi näyttää samaa työtä käyttäen. Mutta mielestäni se on yksinkertaisin tila, se aina lisää tulosteeseen uuden tietueen.

Nämä tulostustilat on helpompi ymmärtää, jos harkitset tulosten tallentamista taulukkoon. Täydellisessä tulostustilassa taulukko kirjoitetaan uudelleen jokaiselle käsitellylle uudelle viestille, päivitystilassa vain ne rivit, joilla jokin päivitys tapahtui, ja liite lisää aina uuden rivin loppuun.

Tumbling aikaikkuna — Aggregointi aikaväleillä

Suoratoistojärjestelmissä viesteillä on kaksi erilaista aikaleimaa, jotka liittyvät niihin: Tapahtumaaika — Aika, jolloin viesti luotiin, tässä tapauksessa anturin lukuaika, ja Käsittelyaika — Kun käsittelyagentti lukee viestin, meidän tapauksessamme se saavuttaa Sparkin.

Tärkeä ominaisuus virrankäsittelytyökaluissa on kyky käsitellä tapahtuma-aikakäsittelyä. Häivytysikkunat ovat ei-päällekkäisiä kiinteitä aikavälejä, joita käytetään aggregaatioiden tekemiseen käyttämällä tapahtuma-aikasarakkeita. Yksinkertaisemmin sanottuna ne viipaloivat aikajanan samankokoisiksi siivuiksi, jotta jokainen tapahtuma kuuluu yhteen aikaväliin.

Laske esimerkiksi 5 minuutin välein, kuinka monta ajoneuvoa havaittiin viimeisen 5 minuutin aikana.

5min kaatuva ikkuna. Kuva tekijältä.

Alla oleva koodi havainnollistaa tätä:

Tällainen käsittely voi olla erittäin hyödyllistä monissa tilanteissa. Palatakseni aiemmin ehdotettuun ruuhkatunnistimeen, yksi mahdollinen tapa on mitata ajoneuvojen keskinopeus 10 minuutin ikkunassa ja katsoa, ​​onko se tietyn kynnyksen alapuolella.

Tapahtuma-aikainen käsittely on monimutkainen aihe. Kaikkea voi tapahtua sen käsittelyssä, kuten viestien katoaminen, liian myöhään saapuminen tai epäkunnossapito. Sparkilla on useita mekanismeja yrittääkseen lieventää ongelmia, kuten vesileimat, joihin emme keskity.

Aikaikkunoita voidaan käyttää myös yhdessä muiden sarakkeiden kanssa groupBy(). Alla oleva esimerkki laskee ajoneuvojen määrän tyypin mukaan 5 minuutin ikkunassa.

Liukuva aikaikkuna — Aikavälien joustavuus

Liukuvat aikaikkunat ovat joustavia ikkunoita. Sen sijaan, että luotaisiin ei-päällekkäisiä intervalleja, ne mahdollistavat sen määrittämisen, kuinka usein kukin intervalli luodaan.

Laske esimerkiksi 5 minuutin välein, kuinka monta ajoneuvoa havaittiin viimeisen 30 minuutin aikana.

Tästä johtuen tapahtumat voivat kuulua useisiin aikaväleihin ja niitä voidaan laskea niin monta kertaa kuin tarvitaan.

Voit määrittää liukuvan ikkunan siirtämällä päivitysvälin ikkuna()-toiminto.

Katsotaanpa tulos.

Kuten näemme, meillä luodaan 30 minuutin ikkunat joka 5. minuutti.

Tämä joustavuus voi olla varsin hyödyllistä määriteltäessä tarkempia liiketoimintasääntöjä ja monimutkaisempia laukaisimia. Esimerkiksi liikenneruuhkatunnistimemme voi lähettää vastauksia 5 sekunnin välein viimeisen 10 minuutin ajalta ja luoda hälytyksen, kun auton keskinopeus laskee alle 20 km/h.

Tämä oli nopea katsaus Spark Structured Streamingin pääkonsepteihin ja siihen, miten niitä voidaan soveltaa Kafkan kanssa.

Apache Kafka ja Apache Spark ovat molemmat luotettavia ja kestäviä työkaluja, joita monet yritykset käyttävät päivittäin uskomattomien tietomäärien käsittelyyn, mikä tekee niistä yhden stream-käsittelytehtävän vahvimmista pareista.

Olemme oppineet täyttämään, kuluttamaan ja käsittelemään Kafka-aiheita Spark-töiden avulla. Tämä ei ollut vaikea tehtävä, kuten viestissä mainittiin, virrankäsittelyn API on melkein sama kuin tavallinen erä-API, vain muutamalla pienellä säädöllä.

Olemme myös keskustelleet erilaisista lähtötiloista, joistakin suoratoistosovelluksista ja siitä, miten niitä voidaan käyttää. Viimeisenä, mutta ei vähäisimpänä, tutkimme aggregaatioita aikaikkunoilla, jotka ovat yksi stream-käsittelyn tärkeimmistä ominaisuuksista.

Tämä oli jälleen nopea katsaus, ja jätän alle viitteitä, jos haluat tutkia asiaa tarkemmin.

Toivottavasti auttoi jotenkin, kiitos kun luit! 🙂

Kaikki koodit löytyvät tästä GitHub-arkisto.
Käytetty data -
Contagens Volumétricas de Radares, avoin data, Brasilian kuvernööri.

[1] Ominaisuus Deep Dive: Vesileima Apache Spark Structured Streamingissa - Max Fisher Databricks-blogissa
[2] Chambers, B. ja Zaharia, M. (2018). Spark: Lopullinen opas: Big datan käsittely on tehty yksinkertaiseksi. "O'Reilly Media, Inc."
[3] Reaaliaikainen logistiikka, toimitus ja kuljetus Apache Kafkan avulla— Kai Waehner
[4] Mukana Apache Kafka Netflix Studiossa ja Finance Worldissa – Sujuva blogi
[5] Spark Streaming & Kafka — https://sparkbyexamples.com/

Nopea katsaus Spark Structured Streamingiin + Kafka julkaistu uudelleen lähteestä https://towardsdatascience.com/a-fast-look-at-spark-structured-streaming-kafka-f0ff64107325?source=rss—-7f60cf5620c9—4 kautta https:/ /towardsdatascience.com/feed

<!-

->

Aikaleima:

Lisää aiheesta Blockchain-konsultit