Gyors pillantás a Spark Structured Streaming + Kafka szolgáltatásra

Megtanulja az alapokat, hogyan használhatja ezt a nagy teljesítményű duót adatfolyam-feldolgozási feladatokhoz

Fotó Nikhita Singhal on Unsplash

Nemrég sokat tanultam az Apache Kafkáról és az Apache Sparkról, amelyek az adatmérnöki világ két vezető technológiájáról.

Az elmúlt néhány hónapban számos projektet készítettem ezek felhasználásával; "Gépi tanulási adatfolyam Kafka, Debezium és BentoML segítségével” egy példa. Célom, hogy megtanuljam, hogyan lehet hatékony adatfolyamokat létrehozni ezekkel a modern, híres eszközökkel, és megértsem azok előnyeit és hátrányait.

Az elmúlt hónapokban már foglalkoztam azzal, hogyan hozhatok létre ETL-folyamatokat mindkét eszköz használatával, de soha nem használva őket együtt, és ez az a hiányosság, amelyet ma pótolok.

Célunk, hogy megismerjük a Spark+Kafka segítségével egy streaming alkalmazás létrehozásának általános gondolatát, és valós adatok felhasználásával gyors pillantást adjunk annak főbb koncepcióira.

Az ötlet egyszerű – az Apache Kafka egy üzenetfolyam-küldő eszköz, ahol a producerek üzeneteket írnak a sor (úgynevezett téma), hogy a fogyasztók elolvassák.

De ez egy nagyon összetett eszköz, rugalmas elosztott üzenetküldő szolgáltatásnak készült, mindenféle kézbesítési garanciával (pontosan egyszer, egyszer, bármilyen), üzenettárolással és üzenetreplikációval, ugyanakkor rugalmasságot, méretezhetőséget és nagy áteresztőképességet is biztosít. Szélesebb körű használati esetekkel rendelkezik, mint például a mikroszolgáltatások kommunikációja, a valós idejű eseményrendszerek és a streaming ETL-folyamatok.

Az Apache Spark egy elosztott memória alapú adatátalakító motor.

Ez egy nagyon összetett eszköz is, amely mindenféle adatbázishoz, fájlrendszerhez és felhő infrastruktúrához képes kapcsolódni. Elosztott környezetekben való működésre készült, hogy párhuzamosítsa a gépek közötti feldolgozást, nagy teljesítményű átalakításokat valósítson meg a lusta kiértékelési filozófiája és a lekérdezésoptimalizálás segítségével.

A legmenőbb az egészben, hogy a nap végére a kód csak a szokásos SQL-lekérdezés vagy (majdnem) a Python+pandas szkript, és az összes boszorkányságot egy kellemes, felhasználóbarát, magas szintű API-ban absztraháljuk.

Csatlakozzon ehhez a két technológiához, és tökéletes párost találunk egy streaming ETL-csővezeték felépítéséhez.

Belo Horizonte (BH), Minas Gerais (Brazília) fővárosának forgalmi érzékelőinek adatait fogjuk használni. Ez egy hatalmas adatkészlet, amely a város több helyén mért forgalom mérését tartalmazza. Mindegyik érzékelő rendszeresen észleli az adott helyen közlekedő jármű típusát (autó, motorkerékpár, busz/teherautó), sebességét és hosszát (és egyéb információkat, amelyeket nem fogunk használni).

Ez az adatkészlet pontosan a streaming rendszerek egyik klasszikus alkalmazását képviseli – az érzékelők egy csoportját, amelyek folyamatosan küldik leolvasásaikat a terepen.

Ebben a forgatókönyvben az Apache Kafka absztrakciós rétegként használható az érzékelők és az adatokat fogyasztó alkalmazások között.

A Kafka absztrakciós rétegként szolgál a források és szolgáltatások között. A kép szerzője.

Ezzel a fajta infrastruktúrával mindenféle (ún. valós idejű eseményvezérelt rendszerek, mint egy program, amely észleli és figyelmezteti a forgalmi dugókat, amikor az átlagsebesség csökkenésével hirtelen megnő a járművek száma.

És itt jön képbe az Apache Spark.

Van egy natív modulja az adatfolyam-feldolgozáshoz, az úgynevezett Spark Structured Streaming, amely képes csatlakozni Kafkához és feldolgozni az üzeneteit.

A környezet kialakítása

Csak dokkolóra és docker-kompozícióra van szüksége.

Docker-compose fájlkonfigurációt fogunk használni a következő tárolók alapján: link szikra, link kafka.

A ./src kötetben fogjuk elhelyezni a forgatókönyveinket.

A környezet elindításához csak fuss

dokkoló-összeállít

Ebben az összes kód elérhető GitHub tárház.

Az egyik dolog, ami a legjobban tetszett, amikor elkezdtem tanulmányozni a Sparkot, az a hasonlóság volt az írott kód és a szokásos python+pandas szkriptjeim között. Nagyon könnyű volt költözni.

Ugyanezt a logikát követve a Spark streaming modulja nagyon hasonlít a szokásos spark-kódhoz, így könnyen áttérhet a kötegelt alkalmazásokból a stream alkalmazásokba.

Mindezek ellenére a következő szakaszokban a Spark strukturált streaming sajátosságainak megismerésére fogunk összpontosítani, vagyis arra, hogy milyen új funkciói vannak.

Az első munkánk

Kezdjük lassan, és építsünk egy játékpéldát

Az első dolog, hogy hozzunk létre egy Kafka-témát, ahonnan a mi szikrafelhasználásunk fogja felemészteni az üzeneteket.

Ezt végzi el a Kafka konténerterminál elérése és végrehajtása:

kafka-topics.sh --create --bootstrap-server localhost:9092 --topic teszt_téma

Ahhoz, hogy szimuláljunk egy producert, aki üzeneteket ír erről a témáról, használjuk a kafka-konzol-producer. A tartály belsejében is:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_topic --property "parse.key=true" --property "key.separator=:"

Mostantól minden terminálba beírt sor üzenetként kerül elküldésre a teszttémának. A „:” karakter az üzenet kulcsának és értékének elválasztására szolgál (kulcs:érték).

Hozzon létre egy Spark-feladatot ennek a témakörnek a felhasználásához.

A kódot be kell helyezni a /src/streaming mappát (semmi különleges, csak az általam választott mappa).

A legfontosabb dolog, amit meg kell jegyeznünk, hogy az attribútumokat használjuk readStream és a writeStream, a normál olvasás és írás helyett. Ez a fő szempont, ami miatt a Spark streaming alkalmazásként kezeli a munkánkat.

A Kafkához való csatlakozáshoz meg kell adni a szervert és a témát. Az opció startOffsets=“legkorábban” mondja Sparknak, hogy az elejétől olvassa el a témát. Azért is, mert Kafka az üzeneteit tárolja kétkomponensű formában, dekódolni kell őket húr.

A többi lehetőséget tovább vizsgálják.

Most nyissa meg a Spark-tárolót, és futtassa a feladatot.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/read_test_stream.py

Néhány másodperces konfigurálás után elkezdi fogyasztani a témát.

Kafkától érkező üzenetek felkeltése. A kép szerzője.

A Spark Streaming működik mikro-adagolás módban, és ezért látjuk a „kötegelt” információt, amikor az üzeneteket fogyaszt.

A mikrokötegelés némileg a teljes „igaz” adatfolyam között van, ahol az összes üzenetet egyenként dolgozzák fel, amint megérkeznek, és a szokásos kötegelt, ahol az adatok statikusak maradnak, és igény szerint fogyasztják. A Spark vár egy ideig, és megpróbálja felhalmozni az üzeneteket, hogy együtt dolgozza fel őket, csökkentve ezzel a többletköltséget és növelve a várakozási időt. Ez az Ön igényei szerint hangolható.

Nem vagyok szupergyors gépíró, ezért a Spark feldolgozza az üzenetet, mielőtt újakat vehetnék fel az aktuális kötegbe.

És ez volt az első streaming munkánk!

Remélem, érzed: nem nehéz lekódolni egy adatfolyam-feldolgozási feladatot, de van néhány hiba.

Adatok írása Kafka adatfolyamba

Itt az ideje, hogy elkezdjen játszani az érzékelő adataival.

Letöltheti postai irányítószám fájlt 2022 AUGUSTUSBÓL, és csomagolja ki a /adat hangerő. Az adatok eredetileg JSON-ban vannak, és körülbelül 23 Gb helyet foglalnak el. Első lépésként parkettává kell alakítani a lemezterület és az olvasási idő optimalizálása érdekében.

Az ehhez szükséges spark jobokat a GitHub adattárában részletezik, mindössze annyit kell tennie, hogy végrehajtja őket:

spark-submit /src/transform_json_to_parquet.pyspark-submit /src/join_parquet_files.py

A géptől függően a végrehajtás eltarthat egy ideig. De kifizetődő, a végleges parketta fájl mérete ~1Gb (több mint 20x kisebb) és sokkal gyorsabban olvasható.

Létre kell hoznunk a Kafka témát is, hogy megkapjuk üzeneteinket:

kafka-topics.sh --create --replication-factor 1 --bootstrap-server localhost:9092 --topic traffic_sensor

Opcionálisan, ha szeretné megjeleníteni a beérkező üzeneteket, lehetőség van konzolos fogyasztó beállítására.

kafka-console-consumer.sh --topic traffic_sensor --bootstrap-server localhost:9092

Egy Kafka-témában egyszerű adatot írni, de van néhány részlete.

A strukturált adatfolyamban az alapértelmezett viselkedés az, hogy nem próbálunk következtetni az adatsémára (oszlopok és típusaik), ezért át kell adnunk egyet.

A Kafka-üzenetek csak kulcs-érték bináris karakterláncpárok, így adatainkat ebben a formátumban kell ábrázolnunk. Ez könnyen elérhető úgy, hogy az összes sort JSON karakterláncokká konvertálja, binárisan kódolja, és az eredményt az „érték” oszlopban tárolja.

Oszlopok átalakítása JSON-karakterláncokká. A kép szerzője.

Az üzenetkulcsok nagyon fontosak a Kafkában, de nem lesznek hasznosak a teszteink során, így minden üzenet ugyanaz lesz.

Mint korábban említettük, ez az adatkészlet HATALMAS, ezért a beszúrt üzenetek számát 500,000 XNUMX-re korlátoztam.

Végül átadjuk a Kafka szervert és a témát, valamint egy „ellenőrzőpontHely” ahol a szikra tárolja a végrehajtás előrehaladását, ami hasznos a hibákból való helyreállításhoz.

A munka elvégzése:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/insert_traffic_topic.py
Adatok beszúrása Kafkába. A kép szerzője.

A bal oldalon a Spark feladat olvassa be a fájlt, a jobb oldalon a kafka-konzol-fogyasztó megjeleníti a beérkező üzeneteket.

Forgalmi témánk fel van töltve, és szinte feldolgozásra kész.

Fontos megjegyezni, hogy a témánkat csak tanulási célból gyújtómunkával töltöttük fel. Valós forgatókönyv esetén maguk az érzékelők küldik a leolvasásokat közvetlenül Kafkának.

Ennek a dinamikus viselkedésnek a szimulálására az alábbi szkript 1 másodpercenként ír egy sort a témához.

Kimeneti módok — A járművek számának számlálása típusonként

Továbblépve hozzunk létre egy feladatot a járművek számának típusonkénti megszámlálására.

A „Classificação” (Osztályozás) oszlop az észlelt járműtípust tartalmazza.

Ahogy a témából olvasunk, a JSON bináris karakterláncokat vissza kell konvertálnunk oszlopos formátumba.

Ha ez megtörtént, a lekérdezés a szokásos módon felépíthető. Érdekes megjegyezni, hogy a lekérdezés szíve csak a válasszukcsoportosítszámít() szekvencia, a többi a streaming logikához kapcsolódik.

Ideje tehát foglalkozni a outputMode() választási lehetőség.

Egy adatfolyam-alkalmazás kimeneti módja határozza meg, hogyan szeretnénk (újra)számítani és írni az eredményeket, amikor új adatok érkeznek.

Három különböző értéket vehet fel:

  • mellékel: Csak új rekordok hozzáadása a kimenethez.
  • teljes: Minden új rekord teljes eredményének újraszámítása.
  • Frissítések: Frissítse a megváltozott rekordokat.

Ezek a módok a megírt alkalmazástól függően értelmesek vagy nem. Például előfordulhat, hogy a „teljes” módnak nincs értelme, ha csoportosítást vagy rendezést hajtanak végre.

Végezzük el a munkát „befejezett” módban, és nézzük meg az eredményeket.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/group_by_vehicle_type.py
Caminhão — Teherautó, Automóvel-Car, Indefinido-Undefined, Ônibus-Bus, Moto-Motocycle. A kép szerzője.

Amint új rekordok kerülnek be az adatfolyamba (lásd a terminált a jobb oldalon), a feladat újraszámítja a teljes eredményt. Ez hasznos lehet olyan helyzetekben, amikor fontos a sorok sorrendje, például rangsorolás vagy verseny.

Ez a megközelítés azonban nem biztos, hogy optimális, ha a csoportok száma túl nagy, vagy az egyes változtatások nem befolyásolják a teljes eredményt.

Tehát egy másik lehetőség a „frissítés” kimeneti mód használata, amely csak a megváltozott csoportok számára generál új üzenetet. Lásd alább:

A lekérdezés „frissítés” kimeneti móddal. A kép szerzője.

A „hozzáfűzés” mód nem érhető el a csoportosítással rendelkező lekérdezéseknél, így nem tudom ugyanazt a feladatot használni. De szerintem ez a legegyszerűbb mód mindig új rekordot ad a kimenethez.

Ezeket a kimeneti módokat egyszerűbb megérteni, ha az eredményeket táblázatba menti. A teljes kimeneti módban a táblázat minden feldolgozott új üzenethez újraírásra kerül, frissítési módban csak azokat a sorokat írja át, ahol frissítés történt, és a hozzáfűzés mindig egy új sort ír a végére.

Zuhanó időablak — Összesítés időintervallumok használatával

A streaming rendszerekben az üzenetekhez két különböző időbélyeg tartozik: Esemény ideje — Az üzenet létrehozásának időpontja, esetünkben a szenzor olvasási ideje, és Feldolgozási idő — Amikor a feldolgozó ügynök elolvassa az üzenetet, esetünkben amikor eléri Sparkot.

Az adatfolyam-feldolgozó eszközök fontos jellemzője az eseményidő-feldolgozás kezelésének képessége. A bukdácsoló ablakok nem átfedő, rögzített időintervallumok, amelyek az eseményidő oszlopok használatával történő összesítéshez használhatók. Egyszerűbben fogalmazva, az idővonalat egyforma méretű szeletekre vágják, így minden esemény egyetlen intervallumhoz tartozik.

Például számolja meg 5 percenként, hogy hány járművet észleltek az elmúlt 5 percben.

5 perces bukdácsoló ablak. A kép szerzője.

Az alábbi kód ezt szemlélteti:

Ez a fajta feldolgozás számos helyzetben rendkívül hasznos lehet. Visszatérve a korábban javasolt forgalmi dugóérzékelőre, az egyik lehetséges megközelítés az, hogy egy 10 perces ablakban megmérjük a járművek átlagsebességét, és megnézzük, hogy egy bizonyos küszöb alatt van-e.

Az eseményidő-feldolgozás összetett téma. Minden megtörténhet, amikor foglalkozol vele, például az üzenetek elvesznek, túl későn érkeznek meg, vagy meghibásodnak. A Spark számos mechanizmussal próbálja enyhíteni a problémákat, például vízjelek, amire nem fogunk összpontosítani.

Az időablakok a következő oszlopokkal együtt is használhatók csoportosít(). Az alábbi példa a járművek számát típusonként számolja meg egy 5 perces ablakban.

Csúszó időablak — Az időintervallumok rugalmassága

A csúszó időablakok a bukdácsoló ablakok rugalmasabbá teszik. A nem átfedő intervallumok létrehozása helyett lehetővé teszik annak meghatározását, hogy az egyes intervallumok milyen gyakran legyenek létrehozva.

Például 5 percenként számolja meg, hány járművet észleltek az elmúlt 30 percben.

Emiatt az események sok intervallumhoz tartozhatnak, és annyiszor számolhatók, ahányszor szükséges.

Csúszóablak meghatározásához egyszerűen adja át a frissítési időközt a ablak()funkció.

Lássuk a kimenetet.

Amint látjuk, 30 percenként 5 perces ablakok jönnek létre.

Ez a rugalmasság nagyon hasznos lehet konkrétabb üzleti szabályok és összetettebb triggerek meghatározásához. Forgalmi dugóérzékelőnk például 5 másodpercenként tud válaszokat küldeni az elmúlt 10 percről, és riasztást küld, ha az autó átlagsebessége 20 km/h alá csökken.

Ez egy gyors áttekintés volt a Spark Structured Streaming főbb koncepcióiról és arról, hogyan alkalmazhatók a Kafkával.

Az Apache Kafka és az Apache Spark megbízható és robusztus eszközök, amelyeket sok vállalat használ naponta hihetetlen mennyiségű adat feldolgozására, így az egyik legerősebb pár a streamfeldolgozási feladatban.

Megtanultuk, hogyan töltsünk fel, használjunk és dolgozzunk fel Kafka-témákat a Spark-feladatok segítségével. Ez nem volt nehéz feladat, amint azt a bejegyzésben említettük, a stream feldolgozó API szinte megegyezik a szokásos kötegelt API-val, csak néhány apró módosítással.

Megbeszéltük a különböző kimeneti módokat is, a streamalkalmazásokra jellemző dolgokat, és azt, hogy mindegyik hogyan használható. Végül, de nem utolsósorban megvizsgáltuk az időablakokkal rendelkező aggregációkat, amelyek az adatfolyam-feldolgozás egyik fő képessége.

Ez megint csak egy gyors pillantás volt, és alább hagyok néhány hivatkozást, ha mélyebbre szeretnél tárni magad.

Remélem segítettem valahogy, köszönöm, hogy elolvastad! 🙂

Ebben az összes kód elérhető GitHub tárház.
Felhasznált adatok –
Contagens Volumetricas de Radares, Nyílt adatok, brazil kormány.

[1] Deep Dive szolgáltatás: Vízjel az Apache Spark Structured Streamingben - Max Fisher a Databricks blogon
[2] Chambers, B. és Zaharia, M. (2018). Spark: A végleges útmutató: A nagy adatfeldolgozás egyszerűbbé vált. „O'Reilly Media, Inc.”.
[3] Valós idejű logisztika, szállítás és szállítás az Apache Kafkával– Kai Waehner
[4] Apache Kafka közreműködésével a Netflix Stúdióban és a Finance Worldben — Összefolyó blog
[5] Spark Streaming és Kafka – https://sparkbyexamples.com/

Gyors áttekintés a Spark Structured Streamingről + Kafka újraközölve a következő forrásból: https://towardsdatascience.com/a-fast-look-at-spark-structured-streaming-kafka-f0ff64107325?source=rss—-7f60cf5620c9—4 a következőn keresztül: https:/ /towardsdatascience.com/feed

<!–

->

Időbélyeg:

Még több Blockchain tanácsadók