Pandangan Cepat tentang Streaming Terstruktur Spark + Kafka

Mempelajari dasar-dasar cara menggunakan duo hebat ini untuk tugas pemrosesan aliran

Foto oleh Nikhita Singhal on Unsplash

Baru-baru ini saya mulai belajar banyak tentang Apache Kafka dan Apache Spark, dua teknologi terkemuka di dunia rekayasa data.

Saya telah membuat beberapa proyek dengan menggunakannya dalam beberapa bulan terakhir; โ€œStreaming Pembelajaran Mesin dengan Kafka, Debezium, dan BentoMLโ€ adalah sebuah contoh. Fokus saya adalah mempelajari cara membuat saluran data yang kuat dengan alat-alat modern yang terkenal ini dan memahami kelebihan dan kekurangannya.

Dalam beberapa bulan terakhir, saya telah membahas cara membuat pipeline ETL menggunakan kedua alat tersebut tetapi tidak pernah menggunakannya secara bersamaan, dan itulah celah yang akan saya isi hari ini.

Tujuan kami adalah mempelajari gagasan umum di balik pembuatan aplikasi streaming dengan Spark+Kafka dan memberikan gambaran sekilas tentang konsep utamanya menggunakan data nyata.

Idenya sederhana โ€” Apache Kafka adalah alat pengaliran pesan, tempat produsen menulis pesan di salah satu ujung antrean (disebut a tema) untuk dibaca oleh konsumen di sisi lain.

Namun ini adalah alat yang sangat kompleks, dibangun untuk menjadi layanan pesan terdistribusi yang tangguh, dengan segala macam jaminan pengiriman (tepat sekali, sekali, kapan saja), penyimpanan pesan, dan replikasi pesan, sekaligus memungkinkan fleksibilitas, skalabilitas, dan throughput yang tinggi. Ini memiliki serangkaian kasus penggunaan yang lebih luas, seperti komunikasi layanan mikro, sistem peristiwa real-time, dan saluran streaming ETL.

Apache Spark adalah mesin transformasi data berbasis memori terdistribusi.

Ini juga merupakan alat yang sangat kompleks, mampu terhubung dengan semua jenis database, sistem file, dan infrastruktur cloud. Ia diarahkan untuk beroperasi di lingkungan terdistribusi untuk memparalelkan pemrosesan antar mesin, mencapai transformasi kinerja tinggi dengan menggunakan filosofi evaluasi malas dan optimalisasi kueri.

Bagian kerennya adalah, pada akhirnya, kode tersebut hanyalah kueri SQL biasa atau (hampir) skrip Python+panda Anda, dengan semua sihir diabstraksi di bawah API tingkat tinggi yang ramah pengguna.

Gabungkan kedua teknologi ini dan kami akan mendapatkan pasangan yang cocok untuk membangun saluran ETL streaming.

Kami akan menggunakan data dari sensor lalu lintas di kota Belo Horizonte (BH), ibu kota Minas Gerais (Brasil). Ini adalah kumpulan data besar yang berisi pengukuran arus lalu lintas di beberapa tempat di kota. Setiap sensor secara berkala mendeteksi jenis kendaraan yang melaju di lokasi tersebut (mobil, sepeda motor, bus/truk), kecepatan dan panjangnya (dan informasi lain yang tidak akan kami gunakan).

Kumpulan data ini mewakili salah satu aplikasi klasik untuk sistem streaming โ€” sekelompok sensor yang mengirimkan pembacaannya secara terus menerus dari lapangan.

Dalam skenario ini, Apache Kafka dapat digunakan sebagai lapisan abstraksi antara sensor dan aplikasi yang menggunakan datanya.

Kafka digunakan sebagai lapisan abstraksi antara sumber dan layanan. Gambar oleh Penulis.

Dengan infrastruktur seperti ini, dimungkinkan untuk membangun segala macam (yang disebut) sistem berbasis peristiwa real-time, seperti program untuk mendeteksi dan memperingatkan kemacetan lalu lintas ketika jumlah kendaraan tiba-tiba bertambah seiring dengan penurunan kecepatan rata-rata.

Dan di situlah Apache Spark berperan.

Ia memiliki modul asli untuk pemrosesan aliran yang disebut Percikan Streaming Terstruktur, yang dapat terhubung ke Kafka dan memproses pesannya.

Menyiapkan lingkungan

Yang Anda butuhkan hanyalah buruh pelabuhan dan komposisi buruh pelabuhan.

Kami akan menggunakan konfigurasi file penulisan buruh pelabuhan berdasarkan repositori berikut: percikan tautan, tautan kafka.

Grafik ./src volume adalah tempat kita akan meletakkan skrip kita.

Untuk memulai lingkungan, jalankan saja

docker-compose up

Semua kode tersedia di sini Repositori GitHub.

Salah satu hal yang paling saya sukai ketika mulai mempelajari Spark adalah kesamaan antara kode tertulisnya dan skrip python+pandas saya yang biasa. Sangat mudah untuk bermigrasi.

Mengikuti logika yang sama, modul streaming Spark sangat mirip dengan kode percikan biasa, sehingga memudahkan migrasi dari aplikasi batch ke aplikasi streaming.

Oleh karena itu, di bagian berikut, kami akan fokus mempelajari kekhususan streaming terstruktur Spark, yaitu fitur baru apa yang dimilikinya.

Pekerjaan pertama kami

Mari kita mulai perlahan dan membuat contoh mainan

Hal pertama yang harus dilakukan adalah membuat topik Kafka dari mana tugas percikan kita akan menggunakan pesan-pesan tersebut.

Ini dilakukan oleh mengakses terminal kontainer Kafka dan mengeksekusi:

kafka-topics.sh --buat --bootstrap-server localhost:9092 --topic test_topic

Untuk mensimulasikan produser yang menulis pesan tentang topik ini, mari gunakan kafka-konsol-produser. Juga di dalam wadah:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_topic --property "parse.key=true" --property "key.separator=:"

Mulai sekarang, setiap baris yang diketik di terminal akan dikirim sebagai pesan ke topik pengujian. Karakter โ€œ:โ€ digunakan untuk memisahkan kunci dan nilai pesan (kunci:nilai).

Mari buat pekerjaan Spark untuk menggunakan topik ini.

Kode harus dimasukkan ke dalam /src/streaming folder (tidak ada yang istimewa, hanya folder yang saya pilih).

Hal utama yang perlu diperhatikan adalah kita menggunakan atribut bacaStream dan tulisStream, bukannya membaca dan menulis seperti biasa. Inilah aspek utama yang membuat Spark memperlakukan pekerjaan kita sebagai aplikasi streaming.

Untuk terhubung ke Kafka, Anda perlu menentukan server dan topik. Pilihan permulaanOffset=โ€œpaling awalโ€ memberitahu Spark untuk membaca topik dari awal. Juga, karena Kafka menyimpan pesannya di biner bentuknya, mereka perlu diterjemahkan tali.

Opsi lainnya akan dieksplorasi lebih lanjut.

Sekarang, mari akses kontainer Spark dan jalankan pekerjaannya.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/read_test_stream.py

Setelah beberapa detik konfigurasi, topik akan mulai dikonsumsi.

Percikan pesan yang memakan dari Kafka. Gambar oleh Penulis.

Spark Streaming berfungsi batching mikro mode, dan itulah mengapa kami melihat informasi "batch" saat menggunakan pesan.

Micro-batching berada di antara streaming โ€œsebenarnyaโ€ penuh, di mana semua pesan diproses secara individual saat pesan tiba, dan batch biasa, di mana data tetap statis dan dikonsumsi sesuai permintaan. Spark akan menunggu beberapa saat untuk mencoba mengumpulkan pesan untuk memprosesnya bersama-sama, mengurangi overhead dan meningkatkan latensi. Ini dapat disesuaikan dengan kebutuhan Anda.

Saya bukan tipe yang super cepat, jadi Spark memproses pesan tersebut sebelum saya dapat memasukkan pesan baru ke dalam kumpulan saat ini.

Dan itu adalah pekerjaan streaming pertama kami!

Saya harap Anda memahaminya: tidak sulit untuk mengkodekan pekerjaan pemrosesan aliran, tetapi ada beberapa masalah.

Menulis data ke aliran Kafka

Sekarang saatnya mulai bermain-main dengan data sensor.

Anda dapat mendownload zip file dari AGUSTUS 2022 dan ekstrak ke dalam /data volume. Data aslinya dalam JSON dan memakan ruang sekitar 23 Gb. Hal pertama yang harus dilakukan adalah mengubahnya menjadi parket untuk mengoptimalkan ruang disk dan waktu membaca.

Pekerjaan percikan untuk melakukan ini dijelaskan secara rinci di repositori GitHub, yang perlu Anda lakukan hanyalah menjalankannya:

kirim percikan /src/transform_json_to_parquet.pykirim percikan /src/join_parquet_files.py

Tergantung pada mesin Anda, eksekusi mungkin memerlukan waktu. Namun ada manfaatnya, ukuran file parket akhir adalah ~1Gb (lebih dari 20x lebih kecil) dan jauh lebih cepat untuk dibaca.

Kita juga perlu membuat topik Kafka untuk menerima pesan kita:

kafka-topics.sh --create --replication-factor 1 --bootstrap-server localhost:9092 --topic traffic_sensor

Secara opsional, jika Anda ingin menampilkan pesan yang masuk, Anda dapat menyiapkan konsumen konsol.

kafka-console-consumer.sh --topic traffic_sensor --bootstrap-server localhost:9092

Menulis data pada topik Kafka mudah, namun memiliki beberapa detail.

Dalam streaming terstruktur, perilaku defaultnya adalah tidak mencoba menyimpulkan skema data (kolom dan tipenya), jadi kita harus meneruskannya.

Pesan Kafka hanyalah pasangan string biner nilai kunci, jadi kita perlu merepresentasikan data kita dalam format ini. Hal ini dapat dicapai dengan mudah dengan mengonversi semua baris menjadi string JSON, menyandikannya dalam biner, dan menyimpan hasilnya di kolom โ€œnilaiโ€.

Mengubah kolom menjadi string JSON. Gambar oleh Penulis.

Kunci pesan sangat penting di Kafka, namun tidak akan berguna dalam pengujian kami, jadi semua pesan akan memiliki kesamaan.

Seperti disebutkan sebelumnya, kumpulan data ini BESAR, jadi saya membatasi jumlah pesan yang dimasukkan hingga 500,000.

Terakhir, kami melewati server dan topik Kafka dan โ€œlokasi pos pemeriksaanโ€ dimana percikan akan menyimpan kemajuan eksekusi, berguna untuk memulihkan kesalahan.

Menjalankan pekerjaan:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/insert_traffic_topic.py
Memasukkan data ke Kafka. Gambar oleh Penulis.

Di sebelah kiri, tugas Spark membaca file, di sebelah kanan, a kafka-konsol-konsumen menampilkan pesan yang masuk.

Topik lalu lintas kami sudah terisi dan hampir siap untuk diproses.

Penting untuk diingat bahwa kita menggunakan pekerjaan percikan untuk mengisi topik kita hanya untuk tujuan pembelajaran. Dalam skenario nyata, sensor itu sendiri akan mengirimkan pembacaan langsung ke Kafka.

Untuk menyimulasikan perilaku dinamis ini, skrip di bawah ini menulis 1 baris ke topik setiap 2.5 detik.

Mode keluaran โ€” Menghitung jumlah kendaraan berdasarkan jenis

Selanjutnya mari kita buat tugas untuk menghitung jumlah kendaraan berdasarkan jenisnya.

Kolom โ€œClassificaรงรฃoโ€ (Klasifikasi) berisi jenis kendaraan yang terdeteksi.

Saat kita membaca topik ini, kita perlu mengonversi string biner JSON kembali ke format kolom.

Setelah ini selesai, kueri dapat dibuat seperti biasa. Menarik untuk dicatat bahwa kueri heart hanyalah memilih,grupDengan,menghitung() urutan, sisanya relatif terhadap logika streaming.

Jadi inilah waktunya untuk mengatasi hal tersebut Mode keluaran() pilihan.

Mode keluaran aplikasi aliran menentukan bagaimana kita ingin menghitung (ulang) dan menulis hasilnya saat data baru tiba.

Itu dapat mengasumsikan tiga nilai berbeda:

  • Menambahkan: Hanya menambahkan catatan baru ke output.
  • Menyelesaikan: Menghitung ulang hasil penuh untuk setiap rekor baru.
  • Memperbarui: Perbarui catatan yang diubah.

Mode ini bisa masuk akal atau tidak tergantung pada aplikasi yang ditulis. Misalnya, mode โ€œlengkapโ€ mungkin tidak masuk akal jika ada pengelompokan atau pengurutan yang dilakukan.

Mari jalankan pekerjaan dalam mode "selesai" dan lihat hasilnya.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/group_by_vehicle_type.py
Caminhรฃo โ€” Truk, Automรณvel-Car, Indefinido-Undefinisi, ร”nibus-Bus, Moto-Motocycle. Gambar oleh Penulis.

Saat catatan baru dimasukkan ke dalam aliran (lihat terminal di sebelah kanan), pekerjaan menghitung ulang hasil penuhnya. Hal ini dapat berguna dalam situasi di mana pengurutan baris penting, seperti peringkat atau kompetisi.

Namun, pendekatan ini mungkin tidak optimal jika jumlah kelompok terlalu besar atau perubahan individu tidak berdampak pada hasil keseluruhan.

Jadi, pilihan lainnya adalah menggunakan mode keluaran โ€œperbaruiโ€, yang menghasilkan pesan baru hanya untuk grup yang telah berubah. Lihat di bawah:

Kueri dengan mode keluaran "perbarui". Gambar oleh Penulis.

Mode "tambahkan" tidak tersedia untuk kueri dengan pengelompokan, jadi saya tidak akan dapat menampilkannya menggunakan pekerjaan yang sama. Tapi menurut saya ini adalah mode yang paling sederhana selalu menambahkan rekor baru ke output.

Mode keluaran ini lebih mudah dipahami jika Anda ingin menyimpan hasilnya ke dalam tabel. Dalam mode keluaran lengkap, tabel akan ditulis ulang untuk setiap pesan baru yang diproses, dalam mode pembaruan, hanya baris di mana beberapa pembaruan terjadi, dan penambahan akan selalu menambahkan baris baru di akhir.

Jendela waktu jatuh โ€” Menggabungkan menggunakan interval waktu

Dalam sistem streaming, pesan memiliki dua stempel waktu berbeda yang terkait dengannya: Waktu peristiwa โ€” Waktu saat pesan dibuat, dalam kasus kami waktu pembacaan sensor, dan Waktu pemrosesan โ€” Saat pesan dibaca oleh agen pemrosesan, dalam kasus kami ketika itu mencapai Spark.

Fitur penting dari alat pemrosesan aliran adalah kemampuan untuk menangani pemrosesan waktu peristiwa. Jendela jatuh adalah interval waktu tetap yang tidak tumpang tindih yang digunakan untuk membuat agregasi menggunakan kolom waktu peristiwa. Sederhananya, mereka membagi garis waktu menjadi potongan-potongan berukuran sama sehingga setiap peristiwa termasuk dalam satu interval.

Misalnya hitung, setiap 5 menit, berapa kendaraan yang terdeteksi dalam 5 menit terakhir.

jendela jatuh 5 menit. Gambar oleh Penulis.

Kode di bawah menggambarkan hal ini:

Pemrosesan seperti ini bisa sangat berguna dalam banyak situasi. Kembali ke pendeteksi kemacetan lalu lintas yang diusulkan sebelumnya, salah satu pendekatan yang mungkin dilakukan adalah dengan mengukur kecepatan rata-rata kendaraan dalam jangka waktu 10 menit dan melihat apakah kecepatannya berada di bawah ambang batas tertentu.

Pemrosesan waktu peristiwa adalah topik yang kompleks. Apa pun bisa terjadi saat menghadapinya, seperti pesan hilang, datang terlambat, atau rusak. Spark memiliki beberapa mekanisme untuk mencoba mengurangi masalah ini, seperti watermark, yang tidak akan kami fokuskan.

Jendela waktu juga dapat digunakan bersama dengan kolom lain di grupBy(). Contoh di bawah ini menghitung jumlah kendaraan berdasarkan jenisnya dalam waktu 5 menit.

Jendela waktu geser โ€” Fleksibilitas pada interval waktu

Jendela waktu geser merupakan fleksibilitas dari jendela yang berjatuhan. Daripada membuat interval yang tidak tumpang tindih, mereka memungkinkan penentuan seberapa sering setiap interval akan dibuat.

Misalnya setiap 5 menit, hitung berapa kendaraan yang terdeteksi dalam 30 menit terakhir.

Oleh karena itu, kejadian dapat berada pada banyak interval dan dihitung sebanyak yang diperlukan.

Untuk menentukan jendela geser, teruskan interval pembaruan ke jendela() fungsi.

Mari kita lihat hasilnya.

Seperti yang bisa kita lihat, kita memiliki jendela berdurasi 30 menit yang dibuat setiap 5 menit.

Fleksibilitas ini bisa sangat berguna untuk menentukan aturan bisnis yang lebih spesifik dan pemicu yang lebih kompleks. Misalnya, pendeteksi kemacetan lalu lintas kami dapat mengirimkan respons setiap 5 detik selama 10 menit terakhir dan membuat peringatan ketika kecepatan rata-rata mobil turun di bawah 20 km/jam.

Ini adalah sekilas konsep utama Spark Structured Streaming dan bagaimana konsep tersebut dapat diterapkan dengan Kafka.

Apache Kafka dan Apache Spark adalah alat yang andal dan tangguh yang digunakan oleh banyak perusahaan untuk memproses data dalam jumlah besar setiap hari, menjadikannya salah satu pasangan terkuat dalam tugas pemrosesan aliran.

Kami telah mempelajari cara mengisi, menggunakan, dan memproses topik Kafka menggunakan pekerjaan Spark. Ini bukanlah tugas yang sulit, seperti yang disebutkan dalam postingan, API pemrosesan aliran hampir sama dengan API batch biasa, hanya dengan beberapa penyesuaian kecil.

Kami juga telah membahas mode keluaran yang berbeda, sesuatu yang spesifik untuk aplikasi streaming, dan bagaimana masing-masing mode tersebut dapat digunakan. Terakhir, kami menjelajahi agregasi dengan jendela waktu, salah satu kemampuan utama pemrosesan aliran.

Sekali lagi, ini hanya sekilas saja, dan saya akan meninggalkan beberapa referensi di bawah jika Anda ingin menjelajah lebih dalam.

Semoga saya bisa membantu, terima kasih telah membaca! ๐Ÿ™‚

Semua kode tersedia di sini Repositori GitHub.
Data yang digunakan โ€”
Menular Volume Radares, terbuka Data, Gubernur Brasil.

[1] Fitur Deep Dive: Watermarking di Streaming Terstruktur Apache Spark โ€” Max Fisher di blog Databricks
[2] Chambers, B., & Zaharia, M. (2018). Spark: Panduan definitif: Pemrosesan data besar menjadi sederhana. โ€œO'Reilly Media, Inc.โ€.
[3] Logistik, Pengiriman, dan Transportasi Real-Time dengan Apache Kafkaโ€” Kai Waehner
[4] Menampilkan Apache Kafka di Netflix Studio dan Dunia Keuangan โ€” Blog konfluen
[5] Percikan Streaming & Kafka โ€” https://sparkbyexamples.com/

Sekilas tentang Spark Structured Streaming + Kafka Diterbitkan Ulang dari Sumber https://towardsdatascience.com/a-fast-look-at-spark-structured-streaming-kafka-f0ff64107325?source=rssโ€”-7f60cf5620c9โ€”4 melalui https:/ /towardsdatascience.com/feed

<!โ€“

->

Stempel Waktu:

Lebih dari Konsultan Blockchain