نظرة سريعة على Spark Structured Streaming + Kafka

تعلم أساسيات كيفية استخدام هذا الثنائي القوي لمهام معالجة البث

تصوير نيخيتا سنغال on Unsplash

بدأت مؤخرًا دراسة الكثير عن Apache Kafka و Apache Spark ، وهما تقنيتان رائدتان في عالم هندسة البيانات.

لقد أنجزت العديد من المشاريع باستخدامها في الأشهر القليلة الماضية ؛ "دفق التعلم الآلي باستخدام Kafka و Debezium و BentoML" انه مثال. ينصب تركيزي على تعلم كيفية إنشاء خطوط بيانات قوية باستخدام هذه الأدوات الحديثة الشهيرة والتعرف على مزاياها وعيوبها.

في الأشهر الماضية ، قمت بالفعل بتغطية كيفية إنشاء خطوط أنابيب ETL باستخدام كلتا الأداتين ولكن لم أستخدمهما معًا مطلقًا ، وهذه هي الفجوة التي سأقوم بملئها اليوم.

هدفنا هو معرفة الفكرة العامة وراء إنشاء تطبيق دفق باستخدام Spark + Kafka وإلقاء نظرة سريعة على مفاهيمه الرئيسية باستخدام بيانات حقيقية.

الفكرة بسيطة - Apache Kafka هي أداة تدفق الرسائل ، حيث يكتب المنتجون الرسائل في أحد طرفي قائمة الانتظار (تسمى a موضوع) ليقرأها المستهلكون من جهة أخرى.

لكنها أداة معقدة للغاية ، تم إنشاؤها لتكون خدمة مراسلة موزعة مرنة ، مع جميع أنواع ضمانات التسليم (مرة واحدة بالضبط ، مرة واحدة ، أي) ، وتخزين الرسائل ، وتكرار الرسائل ، مع السماح أيضًا بالمرونة وقابلية التوسع والإنتاجية العالية. لديها مجموعة أوسع من حالات الاستخدام ، مثل اتصالات الخدمات المصغرة وأنظمة الأحداث في الوقت الفعلي وتدفق خطوط أنابيب ETL.

Apache Spark هو محرك تحويل بيانات موزع قائم على الذاكرة.

إنها أيضًا أداة معقدة للغاية ، قادرة على الاتصال بجميع أنواع قواعد البيانات وأنظمة الملفات والبنية التحتية السحابية. إنه مصمم للعمل في بيئات موزعة لموازنة المعالجة بين الآلات ، وتحقيق تحويلات عالية الأداء باستخدام فلسفة التقييم الكسول وتحسينات الاستعلام.

الجزء الرائع في الأمر هو أنه بحلول نهاية اليوم ، يكون الرمز هو مجرد استعلام SQL المعتاد أو (تقريبًا) نص Python + pandas الخاص بك ، مع كل السحر الذي يتم تجريده بموجب واجهة برمجة تطبيقات عالية المستوى سهلة الاستخدام.

انضم إلى هاتين التقنيتين ولدينا تطابق مثالي لبناء خط أنابيب ETL متدفق.

سنستخدم البيانات من أجهزة استشعار حركة المرور في مدينة بيلو هوريزونتي (البحرين) ، عاصمة ميناس جيرايس (البرازيل). إنها مجموعة بيانات ضخمة تحتوي على قياسات تدفق حركة المرور في عدة أماكن في المدينة. يكتشف كل جهاز استشعار بشكل دوري نوع السيارة التي تسير في ذلك الموقع (سيارة ، دراجة نارية ، حافلة / شاحنة) ، وسرعتها وطولها (وغيرها من المعلومات التي لن نستخدمها).

تمثل مجموعة البيانات هذه على وجه التحديد أحد التطبيقات الكلاسيكية لأنظمة التدفق - مجموعة من أجهزة الاستشعار ترسل قراءاتها باستمرار من الحقل.

في هذا السيناريو ، يمكن استخدام Apache Kafka كطبقة تجريدية بين المستشعرات والتطبيقات التي تستهلك بياناتها.

استخدم كافكا كطبقة تجريدية بين المصادر والخدمات. صورة المؤلف.

باستخدام هذا النوع من البنية التحتية ، من الممكن بناء جميع أنواع (ما يسمى) أنظمة تعتمد على الأحداث في الوقت الفعليمثل برنامج للكشف عن الاختناقات المرورية والتنبيه لها عندما يزداد عدد المركبات فجأة مع انخفاض متوسط ​​السرعة.

وهنا يأتي دور Apache Spark.

يحتوي على وحدة أصلية لمعالجة الدفق تسمى شرارة تدفق منظم، يمكنه الاتصال بكافكا ومعالجة رسائله.

تهيئة البيئة

كل ما تحتاجه هو عامل البناء والتركيب.

سنستخدم تكوين ملف docker-compose استنادًا إلى المستودعات التالية: شرارة الارتباط, رابط الكافكة.

./src الحجم هو المكان الذي نضع فيه نصوصنا.

لبدء البيئة ، ما عليك سوى تشغيل

عامل ميناء يؤلف

كل الكود متاح في هذا مستودع جيثب.

من أكثر الأشياء التي أحببتها عند البدء في دراسة Spark هو التشابه بين الكود المكتوب لها ونصوص python + pandas المعتادة. كان من السهل جدًا الهجرة.

باتباع نفس المنطق ، فإن وحدة البث في Spark تشبه إلى حد كبير رمز الشرارة المعتاد ، مما يجعل من السهل الترحيل من التطبيقات الدفعية إلى تطبيقات الدفق.

مع ذلك ، في الأقسام التالية ، سنركز على تعلم خصائص البث المنظم لـ Spark ، أي الميزات الجديدة التي يمتلكها.

عملنا الأول

لنبدأ ببطء ونبني مثالاً على لعبة

أول شيء يجب فعله هو إنشاء موضوع كافكا حيث تستهلك وظيفتنا الشرارة الرسائل.

يتم ذلك عن طريق الوصول إلى محطة حاويات كافكا وتنفيذ:

kafka-topics.sh - إنشاء مضيف محلي لخادم bootstrap: 9092 - موضوع test_topic

لمحاكاة منتج يكتب رسائل حول هذا الموضوع ، دعنا نستخدم الامتداد كفقا-وحدة-منتج. داخل الحاوية أيضًا:

kafka-console-producer.sh - hoststrap-server localhost: 9092 --topic test_topic --property "parse.key = true" --property "key.separator =:"

من الآن ، سيتم إرسال كل سطر تمت كتابته في الجهاز كرسالة إلى موضوع الاختبار. يستخدم الحرف ":" للفصل بين مفتاح الرسالة وقيمتها (المفتاح: القيمة).

لنقم بإنشاء وظيفة Spark لاستهلاك هذا الموضوع.

يجب وضع الرمز داخل ملف / src / الجري مجلد (لا شيء خاص ، فقط المجلد الذي اخترته).

الشيء الرئيسي الذي يجب ملاحظته هو أننا نستخدم السمات readStream و الكتابة بدلا من القراءة والكتابة العادية. هذا هو الجانب الرئيسي الذي يجعل Spark يتعامل مع وظيفتنا كتطبيق متدفق.

للاتصال بكافكا ، من الضروري تحديد الخادم والموضوع. الخيار startOffsets = "في أقرب وقت ممكن "يقول سبارك لقراءة الموضوع من البداية. أيضًا ، لأن كافكا يخزن رسائله بتنسيق ثنائي شكل ، يجب فك تشفيرها إلى سلسلة.

سيتم استكشاف المزيد من الخيارات الأخرى.

الآن ، دعنا نصل إلى حاوية Spark ونشغل الوظيفة.

spark-submit - الحزم org.apache.spark: spark-sql-kafka-0-10_2.12: 3.3.0 /src/streaming/read_test_stream.py

بعد بضع ثوانٍ من التكوين ، سيبدأ في استهلاك الموضوع.

شرر رسائل مستهلكة من كافكا. صورة المؤلف.

يعمل Spark Streaming في الخلط الجزئي الوضع ، ولهذا السبب نرى معلومات "الدُفعة" عندما تستهلك الرسائل.

الدُفعات الصغيرة هي نوعًا ما بين التدفق "الحقيقي" الكامل ، حيث تتم معالجة جميع الرسائل بشكل فردي عند وصولها ، والدُفعة المعتادة ، حيث تظل البيانات ثابتة ويتم استهلاكها عند الطلب. سينتظر Spark بعض الوقت في محاولة لتجميع الرسائل لمعالجتها معًا ، مما يقلل من الحمل ويزيد من زمن الوصول. يمكن ضبط هذا لاحتياجاتك.

أنا لست نوعًا سريعًا للغاية ، لذا يعالج Spark الرسالة قبل أن أتمكن من تضمين رسائل جديدة في الدفعة الحالية.

وكانت هذه أول وظيفة دفق لنا!

آمل أن ينتابك هذا الشعور: ليس من الصعب كتابة كود لوظيفة معالجة البث ، ولكن هناك بعض المشاكل.

كتابة البيانات إلى تيار كافكا

حان الوقت الآن لبدء اللعب ببيانات المستشعر.

يمكنك تنزيل الرمز البريدي ملف من أغسطس 2022 واستخرجه في ملف /البيانات الصوت. البيانات في الأصل بتنسيق JSON وتستهلك حوالي 23 جيجا بايت من المساحة. أول شيء يجب فعله هو تحويله إلى باركيه لتحسين مساحة القرص ووقت القراءة.

تم تفصيل وظائف الشرارة للقيام بذلك في مستودع GitHub ، كل ما عليك فعله هو تنفيذها:

شرارة إرسال /src/transform_json_to_parquet.pyشرارة إرسال /src/join_parquet_files.py

اعتمادًا على جهازك ، قد يستغرق التنفيذ بعض الوقت. ولكن من المجدي أن يكون حجم ملف الباركيه النهائي حوالي 1 جيجا بايت (أصغر من 20 مرة) وأسرع بكثير في القراءة.

نحتاج أيضًا إلى إنشاء موضوع كافكا لتلقي رسائلنا:

kafka-topics.sh - إنشاء - النسخ المتماثل 1 - المضيف المحلي لخادم التمهيد: 9092 - مستشعر حركة المرور الموضوع

اختياريًا ، إذا كنت تريد عرض الرسائل الواردة ، فمن الممكن إعداد عميل وحدة التحكم.

kafka-console-consumer.sh --topic traffic_sensor - bootstrap-server localhost: 9092

كتابة البيانات حول موضوع كافكا سهلة ، لكن بها بعض التفاصيل.

في الدفق المنظم ، يتمثل السلوك الافتراضي في عدم محاولة استنتاج مخطط البيانات (الأعمدة وأنواعها) ، لذلك نحتاج إلى تمرير واحد.

رسائل كافكا هي مجرد أزواج من السلاسل الثنائية ذات القيمة الرئيسية ، لذلك نحتاج إلى تمثيل بياناتنا بهذا التنسيق. يمكن تحقيق ذلك بسهولة عن طريق تحويل جميع الصفوف إلى سلاسل JSON وترميزها في صورة ثنائية وتخزين النتيجة في عمود "القيمة".

تحويل الأعمدة إلى سلاسل JSON. صورة المؤلف.

تعتبر مفاتيح الرسائل مهمة جدًا في كافكا ، لكنها لن تكون مفيدة في اختباراتنا ، لذا سيكون لكل الرسائل نفس الشيء.

كما ذكرنا سابقًا ، فإن مجموعة البيانات هذه ضخمة ، لذلك قمت بتحديد عدد الرسائل المدرجة بـ 500,000 رسالة.

أخيرًا ، مررنا خادم وموضوع كافكا و "نقطة تفتيش"حيث ستخزن الشرارة تقدم التنفيذ ، وهي مفيدة للتعافي من الأخطاء.

تنفيذ الوظيفة:

spark-submit --packages org.apache.spark: spark-sql-kafka-0-10_2.12: 3.3.0 /src/streaming/insert_traffic_topic.py
إدخال البيانات في كافكا. صورة المؤلف.

على اليسار ، تقرأ وظيفة Spark الملف ، على اليمين ، ملف a kafka- وحدة التحكم-المستهلك يعرض الرسائل القادمة.

موضوع حركة المرور لدينا مأهول وجاهز تقريبًا للمعالجة.

من المهم أن تتذكر أننا استخدمنا وظيفة شرارة لملء موضوعنا فقط لأغراض التعلم. في سيناريو حقيقي ، سترسل المستشعرات نفسها قراءات مباشرة إلى كافكا.

لمحاكاة هذا السلوك الديناميكي ، يكتب البرنامج النصي أدناه صفًا واحدًا للموضوع كل 1 ثانية.

أوضاع الإخراج - حساب عدد المركبات حسب النوع

المضي قدمًا ، دعنا ننشئ وظيفة لحساب عدد المركبات حسب النوع.

يحتوي العمود "Classificação" (التصنيف) على نوع السيارة المكتشفة.

بينما نقرأ من الموضوع ، نحتاج إلى تحويل سلاسل JSON الثنائية إلى التنسيق العمودي.

بمجرد الانتهاء من ذلك ، يمكن إنشاء الاستعلام كالمعتاد. من المثير للاهتمام ملاحظة أن قلب الاستعلام هو مجرد ملف حدد().مجموعة من().عد() التسلسل ، كل الباقي متعلق بمنطق التدفق.

لذا حان الوقت لمعالجة وضع الإخراج() اختيار.

يحدد وضع الإخراج لتطبيق الدفق كيف نريد (إعادة) حساب النتائج وكتابتها عند وصول بيانات جديدة.

يمكن أن يفترض ثلاث قيم مختلفة:

  • ألحق: أضف السجلات الجديدة فقط إلى الإخراج.
  • كامل: أعد حساب النتيجة الكاملة لكل سجل جديد.
  • تحديث: تحديث السجلات المتغيرة.

يمكن أو لا يمكن أن تكون هذه الأوضاع منطقية اعتمادًا على التطبيق المكتوب. على سبيل المثال ، قد لا يكون الوضع "كامل" منطقيًا إذا تم إجراء أي تجميع أو فرز.

دعنا ننفذ المهمة في وضع "الإكمال" ونلقي نظرة على النتائج.

spark-submit - الحزم org.apache.spark: spark-sql-kafka-0-10_2.12: 3.3.0 /src/streaming/group_by_vehicle_type.py
Caminhão - شاحنة ، Automóvel-Car ، Indefinido-Undefined ، Ônibus-Bus ، Moto-Motocycle. صورة المؤلف.

عند إدراج سجلات جديدة في الدفق (انظر المحطة على اليمين) ، تعيد الوظيفة حساب النتيجة الكاملة. يمكن أن يكون هذا مفيدًا في المواقف التي يكون فيها ترتيب الصف أمرًا مهمًا ، مثل الترتيب أو المنافسة.

ومع ذلك ، قد لا يكون هذا النهج هو الأمثل إذا كان عدد المجموعات كبيرًا جدًا أو إذا كانت التغييرات الفردية لا تؤثر على النتيجة الإجمالية.

لذلك ، هناك خيار آخر وهو استخدام وضع الإخراج "التحديث" ، والذي يولد رسالة جديدة فقط للمجموعات التي تغيرت. انظر أدناه:

الاستعلام مع وضع الإخراج "تحديث". صورة المؤلف.

وضع "الإلحاق" غير متاح للاستعلامات ذات التجميع ، لذلك لن أتمكن من العرض باستخدام نفس الوظيفة. لكنني أعتقد أنه أبسط طريقة دائما يضيف رقما قياسيا جديدا للمخرجات.

من الأسهل فهم أوضاع الإخراج هذه إذا كنت تفكر في حفظ النتائج في جدول. في وضع الإخراج الكامل ، ستتم إعادة كتابة الجدول لكل رسالة جديدة تتم معالجتها ، في وضع التحديث ، فقط الأسطر التي حدث فيها بعض التحديث ، وسيضيف الملحق دائمًا سطرًا جديدًا إلى النهاية.

نافذة زمنية متقلبة - التجميع باستخدام الفواصل الزمنية

في أنظمة البث ، تحتوي الرسائل على طوابع زمنية مختلفة مرتبطة بها: وقت الحدث - الوقت الذي تم فيه إنشاء الرسالة ، ووقت قراءة المستشعر في حالتنا ، ووقت المعالجة - عندما يقرأ وكيل المعالجة الرسالة ، في حالتنا عندما تصل سبارك.

ميزة مهمة لأدوات معالجة الدفق هي القدرة على التعامل مع معالجة وقت الحدث. النوافذ المتدحرجة هي فترات زمنية ثابتة غير متداخلة تُستخدم لعمل تجميعات باستخدام أعمدة وقت الحدث. لتوضيح الأمر بشكل أكثر بساطة ، يقومون بتقسيم المخطط الزمني إلى شرائح متساوية الحجم بحيث ينتمي كل حدث إلى فترة زمنية واحدة.

على سبيل المثال ، احسب ، كل 5 دقائق ، عدد المركبات التي تم اكتشافها في آخر 5 دقائق.

نافذة هبوط لمدة 5 دقائق. صورة المؤلف.

يوضح الكود أدناه هذا:

يمكن أن يكون هذا النوع من المعالجة مفيدًا للغاية في العديد من المواقف. بالعودة إلى كاشف الازدحام المروري المقترح سابقًا ، تتمثل إحدى الطرق الممكنة في قياس متوسط ​​سرعة المركبات في نافذة مدتها 10 دقائق ومعرفة ما إذا كانت أقل من عتبة معينة.

تعد معالجة وقت الحدث موضوعًا معقدًا. يمكن أن يحدث كل شيء عند التعامل معه ، مثل فقدان الرسائل أو وصولها بعد فوات الأوان أو تعطلها. لدى Spark عدة آليات لمحاولة التخفيف من المشكلات ، مثل علامات مائيةالتي لن نركز عليها.

يمكن أيضًا استخدام النوافذ الزمنية جنبًا إلى جنب مع الأعمدة الأخرى في ملف مجموعة من(). يحسب المثال أدناه عدد المركبات حسب النوع في نافذة 5 دقائق.

انزلاق النافذة الزمنية - المرونة في الفترات الزمنية

انزلاق النوافذ الزمنية هي مرونة في انزلاق النوافذ. بدلاً من إنشاء فترات زمنية غير متداخلة ، فإنها تسمح بتحديد عدد مرات إنشاء كل فترة زمنية.

على سبيل المثال ، احسب عدد المركبات التي تم اكتشافها في آخر 5 دقيقة كل 30 دقائق.

لهذا السبب ، يمكن أن تنتمي الأحداث إلى فترات زمنية عديدة ويتم احتسابها عدة مرات حسب الحاجة.

لتحديد نافذة منزلقة ، ما عليك سوى تمرير الفاصل الزمني للتحديث إلى ملف نافذة() وظيفة.

دعونا نرى الإخراج.

كما نرى ، لدينا 30 دقيقة يتم إنشاؤها كل 5 دقائق.

يمكن أن تكون هذه المرونة مفيدة جدًا لتحديد قواعد عمل أكثر تحديدًا ومحفزات أكثر تعقيدًا. على سبيل المثال ، يمكن لكاشف الازدحام المروري إرسال ردود كل 5 ثوانٍ حول الدقائق العشر الماضية وإنشاء تنبيه عندما ينخفض ​​متوسط ​​سرعة السيارة إلى أقل من 10 كم / ساعة.

كانت هذه نظرة سريعة على المفاهيم الرئيسية لـ Spark Structured Streaming وكيف يمكن تطبيقها مع كافكا.

تعد كل من Apache Kafka و Apache Spark أدوات موثوقة وقوية تستخدمها العديد من الشركات لمعالجة كميات هائلة من البيانات يوميًا ، مما يجعلها واحدة من أقوى الأزواج في مهمة معالجة البث.

لقد تعلمنا كيفية تعبئة موضوعات كافكا واستهلاكها ومعالجتها باستخدام وظائف Spark. لم تكن هذه مهمة صعبة ، كما هو مذكور في المنشور ، فإن واجهة برمجة تطبيقات معالجة الدفق تكاد تكون مساوية لواجهة برمجة تطبيقات الدُفعات المعتادة ، مع بعض التعديلات الطفيفة فقط.

لقد ناقشنا أيضًا أوضاع الإخراج المختلفة ، وشيء خاص بدفق التطبيقات ، وكيف يمكن استخدام كل منها. أخيرًا وليس آخرًا ، استكشفنا التجميعات باستخدام النوافذ الزمنية ، وهي إحدى الإمكانات الرئيسية لمعالجة البث.

مرة أخرى ، كانت هذه نظرة سريعة ، وسأترك بعض المراجع أدناه إذا كنت تريد الاستكشاف بشكل أعمق.

آمل أن أكون قد ساعدت بطريقة ما ، شكرًا لك على القراءة! 🙂

كل الكود متاح في هذا مستودع جيثب.
البيانات المستخدمة -
Contagens Volumétricas de Radares, البيانات المفتوحة، حاكم البرازيل.

[1] ميزة الغوص العميق: وضع علامة مائية في دفق Apache Spark المنظم - ماكس فيشر على مدونة Databricks
[2] تشامبرز ، ب ، وزهارية ، م. (2018). Spark: الدليل النهائي: أصبحت معالجة البيانات الضخمة بسيطة. “O'Reilly Media، Inc.”.
[3] الخدمات اللوجستية والشحن والنقل في الوقت الفعلي مع Apache Kafka- كاي وينر
[4] يضم Apache Kafka في Netflix Studio و Finance World - مدونة متكدسة
[5] Spark Streaming & كافكا - https://sparkbyexamples.com/

نظرة سريعة على Spark Structured Streaming + Kafka المعاد نشره من المصدر https://towardsdatascience.com/a-fast-look-at-spark-structured-streaming-kafka-f0ff64107325؟source=rss—-7f60cf5620c9—4 عبر https: / /towardsdatascience.com/feed

<!–

->

الطابع الزمني:

اكثر من مستشارو Blockchain