Spark Yapılandırılmış Akış + Kafka'ya Hızlı Bir Bakış

Bu güçlü ikilinin akış işleme görevleri için nasıl kullanılacağına ilişkin temel bilgileri öğrenme

Fotoğraf Nikhita Singhal on Unsplash

Son zamanlarda veri mühendisliği dünyasının önde gelen iki teknolojisi olan Apache Kafka ve Apache Spark hakkında çok fazla çalışmaya başladım.

Son birkaç ayda bunları kullanarak birkaç proje yaptım; “Kafka, Debezium ve BentoML ile Makine Öğrenimi Yayını” bir örnektir. Odak noktam, bu modern ve ünlü araçlarla güçlü veri boru hatlarının nasıl oluşturulacağını öğrenmek ve bunların avantaj ve dezavantajları hakkında fikir sahibi olmaktır.

Geçtiğimiz aylarda, her iki aracı kullanarak ancak bunları hiçbir zaman birlikte kullanmadan ETL işlem hatlarının nasıl oluşturulacağını zaten ele almıştım ve bugün dolduracağım boşluk bu.

Amacımız Spark+Kafka ile bir akış uygulaması oluşturmanın ardındaki genel fikri öğrenmek ve gerçek verileri kullanarak uygulamanın ana kavramlarına hızlı bir bakış sunmaktır.

Fikir basit; Apache Kafka, üreticilerin mesajları kuyruğun bir ucuna (buna denir) yazdığı bir mesaj akış aracıdır. konu) diğer taraftan tüketiciler tarafından okunacak.

Ancak bu, her türlü teslimat garantisi (tam olarak bir kez, bir kez, herhangi biri), mesaj depolama ve mesaj çoğaltma özelliklerine sahip, aynı zamanda esneklik, ölçeklenebilirlik ve yüksek verim sağlayan, esnek bir dağıtılmış mesajlaşma hizmeti olarak tasarlanmış çok karmaşık bir araçtır. Mikro hizmet iletişimi, gerçek zamanlı olay sistemleri ve akışlı ETL işlem hatları gibi daha geniş bir kullanım senaryosuna sahiptir.

Apache Spark, dağıtılmış bellek tabanlı bir veri dönüştürme motorudur.

Aynı zamanda her türlü veritabanına, dosya sistemine ve bulut altyapısına bağlanabilen çok karmaşık bir araçtır. Makineler arasındaki işlemleri paralelleştirmek, tembel değerlendirme felsefesini ve sorgu optimizasyonlarını kullanarak yüksek performanslı dönüşümler elde etmek için dağıtılmış ortamlarda çalışacak şekilde tasarlanmıştır.

Bunun güzel yanı, günün sonunda, kodun yalnızca her zamanki SQL sorgunuz veya (neredeyse) Python + pandas betiğiniz olması ve tüm büyücülüklerin güzel, kullanıcı dostu, yüksek seviyeli bir API altında soyutlanmış olmasıdır.

Bu iki teknolojiye katılın ve akışlı bir ETL hattı oluşturmak için mükemmel bir eşleşme elde edin.

Minas Gerais'in (Brezilya) başkenti Belo Horizonte (BH) şehrindeki trafik sensörlerinden elde edilen verileri kullanacağız. Şehrin çeşitli yerlerindeki trafik akışının ölçümlerini içeren devasa bir veri seti. Her sensör, o konumda hareket eden aracın türünü (araba, motosiklet, otobüs/kamyon), hızını ve uzunluğunu (ve kullanmayacağımız diğer bilgileri) periyodik olarak algılar.

Bu veri seti, akış sistemlerine yönelik klasik uygulamalardan birini tam olarak temsil ediyor; okumalarını sahadan sürekli olarak gönderen bir grup sensör.

Bu senaryoda Apache Kafka, sensörler ve onların verilerini tüketen uygulamalar arasında bir soyutlama katmanı olarak kullanılabilir.

Kafka, kaynaklar ve hizmetler arasında bir soyutlama katmanı olarak kullanılır. Resim Yazara aittir.

Bu tür bir altyapı ile her türlü (sözde) binayı inşa etmek mümkündür. gerçek zamanlı olay odaklı sistemlerOrtalama hızda bir düşüşle birlikte araç sayısı aniden arttığında trafik sıkışıklığını tespit eden ve uyarı veren bir program gibi.

Apache Spark'ın devreye girdiği yer burasıdır.

Akış işleme için yerel bir modüle sahiptir. Spark Yapılandırılmış Yayın, Kafka'ya bağlanıp mesajlarını işleyebilir.

Ortamı kurma

İhtiyacınız olan tek şey docker ve docker-compose.

Aşağıdaki depolara dayalı olarak docker tarafından oluşturulan bir dosya yapılandırması kullanacağız: bağlantı kıvılcımı, bağlantı kafka.

The ./kaynak hacim komut dosyalarımızı koyacağımız yerdir.

Ortamı başlatmak için sadece çalıştırın

docker-oluşturmak

Bütün kodlar bunda mevcut GitHub deposu.

Spark'ı incelemeye başladığımda en çok hoşuma giden şeylerden biri, onun için yazılan kod ile her zamanki python+pandas betiklerim arasındaki benzerlikti. Göç etmek çok kolaydı.

Aynı mantığı izleyen Spark'ın akış modülü, olağan kıvılcım koduna çok benzer ve toplu uygulamalardan akışlı uygulamalara geçişi kolaylaştırır.

Bununla birlikte, sonraki bölümlerde Spark yapılandırılmış akışının özelliklerini, yani hangi yeni özelliklere sahip olduğunu öğrenmeye odaklanacağız.

İlk işimiz

Yavaştan başlayalım ve bir oyuncak örneği oluşturalım

Yapılacak ilk şey, kıvılcım işimizin mesajları tüketeceği bir Kafka konusu oluşturmak.

Bu tarafından yapılır Kafka konteyner terminaline erişim ve yürütülüyor:

kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test_topic

Bu konuyla ilgili mesaj yazan bir yapımcıyı simüle etmek için kafka-konsol-yapımcısı. Ayrıca kabın içinde:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_topic --property "parse.key=true" --property "key.separator=:"

Artık terminalde yazılan her satır test konusuna mesaj olarak gönderilecek. Mesajın anahtarını ve değerini (anahtar:değer) ayırmak için “:” karakteri kullanılır.

Bu konuyu tüketmek için bir Spark işi oluşturalım.

Kodun içine yerleştirilmesi gerekiyor /src/akış klasör (özel bir şey yok, yalnızca seçtiğim klasör).

Dikkat edilmesi gereken en önemli şey, nitelikleri kullanmamızdır. okuma Akışı ve yazma Akışı, normal okuma ve yazma yerine. Spark'ın işimizi bir akış uygulaması olarak görmesini sağlayan ana özellik budur.

Kafka'ya bağlanmak için sunucuyu ve konuyu belirtmek gerekir. Seçenek başlangıçOffsetleri=“en erken” Spark'a konuyu baştan okumasını söyler. Ayrıca Kafka mesajlarını içinde sakladığı için ikili şeklinde kodlarının çözülmesi gerekiyor dizi.

Diğer seçenekler daha ayrıntılı olarak incelenecektir.

Şimdi Spark konteynerine erişelim ve işi çalıştıralım.

spark-submit --packages org.Apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/read_test_stream.py

Birkaç saniyelik yapılandırmanın ardından konuyu tüketmeye başlayacaktır.

Kafka'dan kıvılcım tüketen mesajlar. Resim Yazara aittir.

Spark Streaming'de çalışıyor mikro dozajlama modundadır ve bu nedenle mesajları tüketirken “toplu” bilgiyi görüyoruz.

Mikro toplu işlem, tüm mesajların geldikleri anda ayrı ayrı işlendiği tam "gerçek" akış ile verilerin statik kaldığı ve talep üzerine tüketildiği normal toplu iş arasında yer alır. Spark, mesajları bir arada işlemek için bir süre bekleyecek, böylece ek yükü azaltacak ve gecikmeyi artıracaktır. Bu ihtiyaçlarınıza göre ayarlanabilir.

Çok hızlı yazan biri değilim, bu nedenle Spark, mevcut gruba yenilerini eklemeden önce mesajı işler.

Ve bu bizim ilk yayın işimizdi!

Umarım şu duyguyu anlamışsınızdır: Bir akış işleme işini kodlamak zor değil, ancak bazı sorunlar var.

Kafka akışına veri yazma

Artık sensör verileriyle oynamaya başlama zamanı.

Sen indirebilirsiniz zip AĞUSTOS 2022'deki dosyayı çıkarın ve / veri hacim. Veriler orijinal olarak JSON'dadır ve yaklaşık 23 Gb alan kaplar. Yapılacak ilk şey, disk alanını ve okuma süresini optimize etmek için onu parkeye dönüştürmektir.

Bunu yapmak için gereken kıvılcım işleri GitHub deposunda ayrıntılı olarak açıklanmıştır; tek yapmanız gereken bunları yürütmektir:

spark-submit /src/transform_json_to_parquet.pyspark-submit /src/join_parquet_files.py

Makinenize bağlı olarak yürütme biraz zaman alabilir. Ancak karşılığını verir, son parke dosya boyutu ~1 Gb'dir (20 kattan daha küçük) ve okunması çok daha hızlıdır.

Mesajlarımızı almak için ayrıca Kafka konusunu oluşturmamız gerekiyor:

kafka-topics.sh --create --replication-factor 1 --bootstrap-server localhost:9092 --topic trafik_sensor

İsteğe bağlı olarak, gelen mesajların görüntülenmesini istiyorsanız bir konsol tüketicisi ayarlamak mümkündür.

kafka-console-consumer.sh --topic Traffic_sensor --bootstrap-server localhost:9092

Kafka konusuna veri yazmak kolaydır ancak bazı detayları vardır.

Yapılandırılmış akışta, varsayılan davranış, veri şemasını (sütunlar ve türleri) çıkarmaya çalışmamaktır, bu nedenle bir tane aktarmamız gerekir.

Kafka mesajları yalnızca anahtar-değer ikili dize çiftleridir, dolayısıyla verilerimizi bu formatta temsil etmemiz gerekir. Bu, tüm satırları JSON dizelerine dönüştürerek, bunları ikili olarak kodlayarak ve sonucu "değer" sütununda saklayarak kolayca başarılabilir.

Sütunları JSON dizelerine dönüştürme. Resim Yazara aittir.

Mesaj anahtarları Kafka'da çok önemlidir ancak testlerimizde işe yaramayacağından tüm mesajlar aynı olacaktır.

Daha önce de belirttiğim gibi, bu veri seti BÜYÜK, bu yüzden eklenen mesaj sayısını 500,000 ile sınırladım.

Son olarak Kafka sunucusunu ve konusunu geçiyoruz ve bir “kontrol noktasıKonumu” kıvılcımın yürütme ilerlemesini depolayacağı yer, hatalardan kurtulmak için kullanışlıdır.

İşin yürütülmesi:

spark-submit --packages org.Apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/insert_traffic_topic.py
Kafka'ya veri ekleme. Resim Yazara aittir.

Solda Spark işi dosyayı okur, sağda ise bir kafka-konsol-tüketici gelen mesajları görüntüler.

Trafik konumuz dolduruldu ve neredeyse işlenmeye hazır.

Konumuzu sadece öğrenme amacıyla doldurmak için bir kıvılcım çalışması kullandığımızı hatırlamak önemlidir. Gerçek bir senaryoda sensörler okumaları doğrudan Kafka'ya gönderecektir.

Bu dinamik davranışı simüle etmek için aşağıdaki komut dosyası, konuya her 1 saniyede bir 2.5 satır yazar.

Çıkış modları - Türe göre araç sayısını sayma

Devam edelim, araç sayısını türe göre saymak için bir iş oluşturalım.

"Classificação" (Sınıflandırma) sütunu tespit edilen araç tipini içerir.

Konuyu okurken JSON ikili dizelerini tekrar sütunlu formata dönüştürmemiz gerekiyor.

Bu yapıldıktan sonra sorgu her zamanki gibi oluşturulabilir. Sorgu kalbinin yalnızca seçmek().grupBy().saymak() dizisi, geri kalan her şey akış mantığına bağlıdır.

Bu yüzden meseleyi ele almanın zamanı geldi çıkışModu() seçenek.

Bir akış uygulamasının çıktı modu, yeni veriler geldiğinde sonuçları nasıl (yeniden) hesaplamak ve yazmak istediğimizi belirtir.

Üç farklı değer alabilir:

  • eklemek: Çıktıya yalnızca yeni kayıtlar ekleyin.
  • Tamamla: Her yeni kayıt için tam sonucu yeniden hesaplayın.
  • Güncelleme: Değiştirilen kayıtları güncelleyin.

Bu modlar, yazılan uygulamaya bağlı olarak anlamlı olabilir veya olmayabilir. Örneğin herhangi bir gruplama veya sıralama yapıldığında “tamamlandı” modu bir anlam ifade etmeyebilir.

İşi “tamamlandı” modunda yürütelim ve sonuçlara bakalım.

spark-submit --packages org.Apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/group_by_vehicle_type.py
Caminhão - Kamyon, Automóvel-Araba, Indefinido-Tanımsız, Ônibus-Otobüs, Moto-Motosiklet. Resim Yazara aittir.

Akışa yeni kayıtlar eklendikçe (sağdaki terminale bakın), iş tam sonucu yeniden hesaplar. Bu, sıralama veya rekabet gibi satır sıralamasının önemli olduğu durumlarda yararlı olabilir.

Ancak grup sayısı çok fazlaysa veya bireysel değişiklikler genel sonucu etkilemiyorsa bu yaklaşım ideal olmayabilir.

Dolayısıyla diğer bir seçenek de yalnızca değişen gruplar için yeni bir mesaj üreten "güncelleme" çıkış modunu kullanmaktır. Aşağıya bakınız:

Çıkış modu “güncelleme” olan sorgu. Resim Yazara aittir.

Gruplandırmalı sorgularda "ekleme" modu mevcut olmadığından aynı işi kullanarak gösteremeyeceğim. Ama bunun en basit mod olduğunu düşünüyorum. her zaman çıktıya yeni bir kayıt ekler.

Sonuçları bir tabloya kaydetmeyi düşünürseniz, bu çıktı modlarını anlamak daha kolaydır. Tam çıktı modunda, tablo işlenen her yeni mesaj için yeniden yazılacak, güncelleme modunda ise yalnızca bazı güncellemelerin gerçekleştiği satırlar yeniden yazılacak ve ekleme her zaman sonuna yeni bir satır ekleyecektir.

Değişen zaman penceresi — Zaman aralıklarını kullanarak toplama

Akış sistemlerinde mesajların kendileriyle ilgili iki farklı zaman damgası vardır: Olay zamanı — Mesajın oluşturulduğu zaman, bizim durumumuzda sensörün okuma süresi ve İşleme süresi — Mesajın işleme aracısı tarafından okunduğu zaman, bizim durumumuzda, Spark'a ulaşır.

Akış işleme araçlarının önemli bir özelliği, olay zamanı işlemeyi yönetebilme yeteneğidir. Yuvarlanan pencereler, olay zamanı sütunlarını kullanarak toplamalar yapmak için kullanılan, örtüşmeyen sabit zaman aralıklarıdır. Daha basit bir şekilde ifade etmek gerekirse, zaman çizelgesini eşit büyüklükte dilimlere bölerler, böylece her olay tek bir aralığa ait olur.

Örneğin her 5 dakikada bir son 5 dakikada kaç aracın tespit edildiğini sayın.

5 dakikalık yuvarlanma penceresi. Resim Yazara aittir.

Aşağıdaki kod bunu göstermektedir:

Bu tür işlemler birçok durumda son derece yararlı olabilir. Daha önce önerilen trafik sıkışıklığı algılayıcısına geri dönecek olursak, olası bir yaklaşım, araçların ortalama hızını 10 dakikalık bir aralıkta ölçmek ve belirli bir eşiğin altında olup olmadığını görmektir.

Olay anında işleme karmaşık bir konudur. Bununla uğraşırken mesajların kaybolması, çok geç gelmesi veya kullanım dışı kalması gibi her şey olabilir. Spark'ın sorunları hafifletmeye yönelik çeşitli mekanizmaları var: filigran, buna odaklanmayacağız.

Zaman pencereleri aynı zamanda diğer sütunlarla birlikte de kullanılabilir. grupBy(). Aşağıdaki örnekte, 5 dakikalık bir pencerede türe göre araç sayısı sayılmaktadır.

Kayan zaman penceresi - Zaman aralıklarında esneklik

Kayan zaman pencereleri, yuvarlanan pencerelerin esnekleştirilmesidir. Örtüşmeyen aralıklar oluşturmak yerine, her aralığın ne sıklıkta oluşturulacağını tanımlamaya olanak tanırlar.

Örneğin her 5 dakikada bir son 30 dakikada kaç aracın tespit edildiğini sayın.

Bu nedenle olaylar birçok aralığa ait olabilir ve gerektiği kadar sayılabilir.

Bir kayan pencere tanımlamak için güncelleme aralığını pencere() işlev.

Çıktıyı görelim.

Görebildiğimiz gibi, her 30 dakikada bir oluşturulan 5 dakikalık penceremiz var.

Bu esneklik, daha spesifik iş kurallarının ve daha karmaşık tetikleyicilerin tanımlanmasında oldukça yararlı olabilir. Örneğin trafik sıkışıklığı dedektörümüz son 5 dakikaya ilişkin her 10 saniyede bir yanıt gönderebiliyor ve ortalama araç hızı 20 km/saatin altına düştüğünde uyarı oluşturabiliyor.

Bu, Spark Yapılandırılmış Akışın ana kavramlarına ve bunların Kafka ile nasıl uygulanabileceğine hızlı bir bakıştı.

Apache Kafka ve Apache Spark, birçok şirket tarafından günlük olarak inanılmaz miktarda veriyi işlemek için kullanılan güvenilir ve sağlam araçlardır; bu da onları akış işleme görevindeki en güçlü çiftlerden biri haline getirir.

Spark işlerini kullanarak Kafka konularını nasıl dolduracağımızı, tüketeceğimizi ve işleyeceğimizi öğrendik. Gönderide de belirtildiği gibi bu zor bir iş değildi; akış işleme API'si, yalnızca bazı küçük ayarlamalar dışında, normal toplu API'ye neredeyse eşittir.

Ayrıca farklı çıkış modlarını, akış uygulamalarına özgü bir şeyi ve her birinin nasıl kullanılabileceğini de tartıştık. Son olarak, akış işlemenin ana yeteneklerinden biri olan zaman pencereli toplamaları araştırdık.

Yine, bu sadece hızlı bir bakıştı ve daha derin bir araştırma yapmak istiyorsanız aşağıya bazı referanslar bırakacağım.

Umarım bir şekilde yardımcı olmuşumdur, okuduğunuz için teşekkür ederim! 🙂

Bütün kodlar bunda mevcut GitHub deposu.
Kullanılan veriler —
Radar Hacimleri, Açık veri, Brezilya Valisi

[1] Özelliğin Ayrıntılı İncelemesi: Apache Spark Yapılandırılmış Akışta Filigran Ekleme — Databricks blogunda Max Fisher
[2] Chambers, B. ve Zaharia, M. (2018). Spark: Kesin kılavuz: Büyük veri işleme artık daha kolay. “O'Reilly Medya, Inc.”.
[3] Apache Kafka ile Gerçek Zamanlı Lojistik, Nakliye ve Taşımacılık— Kai Waehner
[4] Netflix Stüdyosu ve Finans Dünyasında Apache Kafka'nın Yer Alması — Birleşik blog
[5] Spark Yayını ve Kafka — https://sparkbyexamples.com/

Spark Yapılandırılmış Akış + Kafka'ya Hızlı Bir Bakış Kaynak https://towardsdatascience.com/a-fast-look-at-spark-structured-streaming-kafka-f0ff64107325?source=rss—-7f60cf5620c9—4 adresinden https:// aracılığıyla yeniden yayınlandı /towardsdatascience.com/feed

<!–

->

Zaman Damgası:

Den fazla Blockchain Danışmanları