یادگیری اصول اولیه نحوه استفاده از این دوتایی قدرتمند برای کارهای پردازش جریان
اخیراً شروع به مطالعه زیادی در مورد آپاچی کافکا و آپاچی اسپارک، دو فناوری پیشرو در دنیای مهندسی داده کردم.
من چندین پروژه با استفاده از آنها در چند ماه گذشته ساخته ام. "یادگیری ماشین جریانی با کافکا، دبیزیوم و بنتوML” یک نمونه است. تمرکز من این است که یاد بگیرم چگونه خطوط لوله داده قدرتمندی را با این ابزارهای معروف مدرن ایجاد کنم و مزایا و معایب آنها را درک کنم.
در ماههای گذشته، من قبلاً نحوه ایجاد خطوط لوله ETL را با استفاده از هر دو ابزار اما هرگز با هم استفاده نکردم، و این شکافی است که امروز پر میکنم.
هدف ما یادگیری ایده کلی در پشت ساخت یک برنامه استریم با اسپارک + کافکا و نگاهی سریع به مفاهیم اصلی آن با استفاده از داده های واقعی است.
ایده ساده است - آپاچی کافکا یک ابزار پخش پیام است که در آن تولیدکنندگان پیام ها را در یک انتهای صف می نویسند (به نام موضوع) از طرف دیگر توسط مصرف کنندگان خوانده شود.
اما این یک ابزار بسیار پیچیده است که به عنوان یک سرویس پیام رسانی توزیع شده انعطاف پذیر ساخته شده است، با انواع ضمانت های تحویل (دقیقا یک بار، یک بار، هر بار)، ذخیره پیام و تکرار پیام، در حالی که انعطاف پذیری، مقیاس پذیری و توان عملیاتی بالا را نیز فراهم می کند. دارای مجموعه وسیع تری از موارد استفاده، مانند ارتباطات میکروسرویس، سیستم های رویداد بلادرنگ، و جریان خطوط لوله ETL.
Apache Spark یک موتور تبدیل داده مبتنی بر حافظه توزیع شده است.
همچنین ابزار بسیار پیچیده ای است که می تواند با انواع پایگاه های داده، سیستم های فایل و زیرساخت های ابری ارتباط برقرار کند. این برای کار در محیط های توزیع شده برای موازی سازی پردازش بین ماشین ها، دستیابی به دگرگونی های با کارایی بالا با استفاده از فلسفه ارزیابی تنبل و بهینه سازی پرس و جو طراحی شده است.
نکته جالب در مورد آن این است که، تا پایان روز، کد فقط پرس و جوی SQL معمول شما یا (تقریبا) اسکریپت پایتون+پاندا شما است، با تمام جادوگری که تحت یک API سطح بالا و کاربرپسند خوب انتزاع شده است.
به این دو فناوری بپیوندید و ما یک تطابق کامل برای ایجاد خط لوله جریان ETL داریم.
ما از داده های حسگرهای ترافیک در شهر بلو هوریزونته (BH)، پایتخت میناس گرایس (برزیل) استفاده خواهیم کرد. این مجموعه داده عظیمی است که شامل اندازه گیری جریان ترافیک در چندین مکان در شهر است. هر سنسور به طور دوره ای نوع وسیله نقلیه رانده شده در آن مکان (ماشین، موتور سیکلت، اتوبوس/کامیون)، سرعت و طول آن (و سایر اطلاعاتی که ما از آنها استفاده نمی کنیم) را تشخیص می دهد.
این مجموعه داده دقیقاً یکی از کاربردهای کلاسیک برای سیستمهای استریم را نشان میدهد - گروهی از حسگرها که قرائتهای خود را به طور مداوم از میدان ارسال میکنند.
در این سناریو، آپاچی کافکا می تواند به عنوان یک لایه انتزاعی بین سنسورها و برنامه هایی که داده های آنها را مصرف می کنند، استفاده شود.
با این نوع زیرساخت، امکان ساخت انواع (به اصطلاح) وجود دارد. سیستم های رویداد محور در زمان واقعی، مانند برنامه ای برای شناسایی و هشدار برای راه بندان در زمانی که تعداد وسایل نقلیه به طور ناگهانی با کاهش میانگین سرعت افزایش می یابد.
و اینجاست که آپاچی اسپارک وارد عمل می شود.
دارای یک ماژول بومی برای پردازش جریان به نام جریان ساختار یافته جرقه، که می تواند به کافکا متصل شود و پیام های آن را پردازش کند.
راه اندازی محیط
تنها چیزی که نیاز دارید docker و docker-compose است.
ما از یک پیکربندی فایل docker-compose بر اساس مخازن زیر استفاده خواهیم کرد: جرقه پیوند, لینک کافکا.
La ./src حجم جایی است که ما اسکریپت های خود را قرار می دهیم.
برای شروع محیط کافیست اجرا کنید
docker-compose
تمام کدها در این موجود است مخزن GitHub.
یکی از چیزهایی که هنگام شروع مطالعه Spark بیشتر دوست داشتم، شباهت کدهای نوشته شده برای آن و اسکریپت های معمول پایتون + پانداهای من بود. مهاجرت خیلی راحت بود.
با پیروی از همین منطق، ماژول استریم اسپارک بسیار شبیه به کد جرقه معمولی است و مهاجرت از برنامه های دسته ای به برنامه های استریم را آسان می کند.
با این اوصاف، در بخشهای بعدی، ما بر روی یادگیری ویژگیهای استریم ساختار یافته Spark تمرکز خواهیم کرد، یعنی اینکه چه ویژگیهای جدیدی دارد.
اولین کار ما
بیایید آهسته شروع کنیم و یک نمونه اسباب بازی بسازیم
اولین کاری که باید انجام دهید این است که یک موضوع کافکا ایجاد کنید که از آنجا کار جرقه ما پیام ها را مصرف کند.
این کار توسط دسترسی به ترمینال کانتینری کافکا و اجرا کردن:
kafka-topics.sh --ایجاد --bootstrap-server localhost:9092 --topic test_topic
برای شبیه سازی تهیه کننده ای که در مورد این موضوع پیام می نویسد، از آن استفاده می کنیم کافکا-کنسول-تولیدکننده. همچنین داخل ظرف:
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_topic --property "parse.key=true" --property "key.separator=:"
از این پس هر خطی که در ترمینال تایپ شود به عنوان یک پیام به موضوع آزمون ارسال می شود. کاراکتر ":" برای جدا کردن کلید و مقدار پیام (key:value) استفاده می شود.
بیایید یک Spark job برای مصرف این موضوع ایجاد کنیم.
کد باید در داخل قرار داده شود /src/streaming پوشه (هیچ چیز خاصی نیست، فقط پوشه ای که من انتخاب کردم).
نکته کلیدی که باید توجه داشت این است که ما از ویژگی ها استفاده می کنیم readStream و writeStream، به جای خواندن و نوشتن معمولی این جنبه اصلی است که باعث می شود Spark با کار ما به عنوان یک برنامه استریم برخورد کند.
برای اتصال به کافکا باید سرور و موضوع را مشخص کنید. گزینه startingOffsets=“Earliest” به اسپارک می گوید که موضوع را از ابتدا بخواند. همچنین، زیرا کافکا پیام های خود را در آن ذخیره می کند دوتایی شکل، آنها باید رمزگشایی شوند رشته.
گزینه های دیگر بیشتر مورد بررسی قرار خواهند گرفت.
حالا بیایید به ظرف Spark دسترسی پیدا کنیم و کار را اجرا کنیم.
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/read_test_stream.py
پس از چند ثانیه پیکربندی، شروع به مصرف موضوع می کند.
Spark Streaming در کار می کند میکرو بچینگ حالت، و به همین دلیل است که وقتی پیامها را مصرف میکند، اطلاعات «دستهای» را میبینیم.
Micro-batching تا حدودی بین استریم کامل «واقعی» است، که در آن همه پیامها به صورت جداگانه پردازش میشوند، و دستهای معمول، که در آن دادهها ثابت میمانند و بر حسب تقاضا مصرف میشوند. Spark مدتی صبر می کند تا پیام ها را جمع آوری کند تا آنها را با هم پردازش کند و هزینه های اضافی را کاهش دهد و تاخیر را افزایش دهد. این را می توان با نیازهای شما تنظیم کرد.
من تایپکننده فوقالعاده سریع نیستم، بنابراین Spark قبل از اینکه بتوانم پیامهای جدید را در دسته فعلی اضافه کنم، پیام را پردازش میکند.
و این اولین کار پخش جریانی ما بود!
امیدوارم این احساس را داشته باشید: کدنویسی یک کار پردازش جریانی کار سختی نیست، اما مشکلاتی وجود دارد.
نوشتن داده در جریان کافکا
اکنون زمان شروع بازی با داده های سنسور است.
شما می توانید دانلود کنید زیپ فایل از آگوست 2022 و استخراج آن در /داده ها جلد. داده ها در اصل JSON هستند و حدود 23 گیگابایت فضا می گیرند. اولین کاری که باید انجام دهید این است که آن را به پارکت تبدیل کنید تا فضای دیسک و زمان خواندن بهینه شود.
کارهای spark برای انجام این کار در مخزن GitHub به تفصیل آمده است، تنها کاری که باید انجام دهید این است که آنها را اجرا کنید:
spark-submit /src/transform_json_to_parquet.pyspark-submit /src/join_parquet_files.py
بسته به دستگاه شما، اجرا ممکن است کمی طول بکشد. اما به درد می خورد، اندازه فایل پارکت نهایی ~ 1 گیگابیت است (بیش از 20 برابر کوچکتر) و خواندن آن بسیار سریعتر است.
ما همچنین باید موضوع کافکا را برای دریافت پیام های خود ایجاد کنیم:
kafka-topics.sh --ایجاد --replication-factor 1 --bootstrap-server localhost:9092 --topic traffic_sensor
در صورت تمایل، اگر میخواهید پیامهای دریافتی را نمایش دهید، میتوانید یک مصرفکننده کنسول راهاندازی کنید.
kafka-console-consumer.sh --topic traffic_sensor --bootstrap-server localhost:9092
نوشتن داده در مورد یک موضوع کافکا آسان است، اما دارای برخی جزئیات است.
در جریان ساختیافته، رفتار پیشفرض این است که سعی نکنید طرحواره داده (ستونها و انواع آنها) را استنتاج کنیم، بنابراین باید یکی را پاس کنیم.
پیام های کافکا فقط جفت رشته های دودویی کلید-مقدار هستند، بنابراین باید داده های خود را در این قالب نمایش دهیم. با تبدیل همه ردیفها به رشتههای JSON، کدگذاری آنها به صورت باینری و ذخیره نتیجه در ستون «مقدار» به راحتی میتوان به این امر دست یافت.
کلیدهای پیام در کافکا بسیار مهم هستند، اما در تست های ما کاربردی نخواهند داشت، بنابراین همه پیام ها یکسان خواهند داشت.
همانطور که قبلا ذکر شد، این مجموعه داده عظیم است، بنابراین تعداد پیام های درج شده را به 500,000 محدود کردم.
در نهایت سرور کافکا و موضوع و یک «مکان بازرسیجایی که جرقه پیشرفت اجرا را ذخیره میکند و برای بازیابی خطاها مفید است.
اجرای کار:
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/insert_traffic_topic.py
در سمت چپ، کار Spark فایل را می خواند، در سمت راست، a کافکا-کنسول-مصرف کننده پیام های رسیده را نمایش می دهد.
موضوع ترافیک ما پر شده و تقریباً آماده پردازش است.
مهم است که به یاد داشته باشید که ما از یک spark job برای پر کردن موضوع خود فقط برای اهداف یادگیری استفاده کردیم. در یک سناریوی واقعی، خود حسگرها خوانشها را مستقیماً برای کافکا ارسال میکنند.
برای شبیه سازی این رفتار پویا، اسکریپت زیر هر 1 ثانیه یک ردیف به موضوع می نویسد.
حالت های خروجی - شمارش تعداد وسایل نقلیه بر اساس نوع
در ادامه، بیایید شغلی ایجاد کنیم تا تعداد وسایل نقلیه را بر اساس نوع بشماریم.
ستون "Classificação" (طبقه بندی) حاوی نوع وسیله نقلیه شناسایی شده است.
همانطور که از موضوع می خوانیم، باید رشته های باینری JSON را به فرمت ستونی برگردانیم.
پس از انجام این کار، پرس و جو می تواند به طور معمول ساخته شود. جالب است بدانید که قلب پرس و جو فقط همان است را انتخاب کنید()دسته بندی بر اساس()تعداد دفعات مشاهده() دنباله، بقیه نسبت به منطق جریان است.
پس وقت آن است که به آن بپردازیم حالت خروجی() گزینه.
حالت خروجی یک برنامه استریم مشخص می کند که چگونه می خواهیم نتایج را با رسیدن داده های جدید (دوباره) محاسبه و بنویسیم.
می تواند سه مقدار متفاوت را در نظر بگیرد:
- ضمیمه کردن: فقط رکوردهای جدید را به خروجی اضافه کنید.
- تکمیل: نتیجه کامل را برای هر رکورد جدید دوباره محاسبه کنید.
- بروزرسانی: سوابق تغییر یافته را به روز کنید.
این حالت ها بسته به برنامه نوشته شده می توانند یا نمی توانند معنی داشته باشند. به عنوان مثال، اگر گروه بندی یا مرتب سازی انجام شود، حالت "کامل" ممکن است معنی نداشته باشد.
بیایید کار را در حالت "کامل" اجرا کنیم و به نتایج نگاه کنیم.
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/group_by_vehicle_type.py
همانطور که رکوردهای جدید در جریان درج می شوند (ترمینال را در سمت راست ببینید)، کار نتیجه کامل را دوباره محاسبه می کند. این می تواند در شرایطی که ترتیب ردیف مهم است، مانند رتبه بندی یا رقابت، مفید باشد.
با این حال، اگر تعداد گروه ها خیلی زیاد باشد یا تغییرات فردی بر نتیجه کلی تأثیر نگذارد، این رویکرد ممکن است بهینه نباشد.
بنابراین، گزینه دیگر استفاده از حالت خروجی "به روز رسانی" است که فقط برای گروه هایی که تغییر کرده اند یک پیام جدید ایجاد می کند. زیر را ببینید:
حالت "پیوست" برای جستجوهای دارای گروه بندی در دسترس نیست، بنابراین من نمی توانم با استفاده از همان کار نشان دهم. اما من فکر می کنم که این ساده ترین حالت، آن است همیشه یک رکورد جدید به خروجی اضافه می کند.
اگر به ذخیره نتایج در جدول فکر می کنید، درک این حالت های خروجی ساده تر است. در حالت خروجی کامل، جدول برای هر پیام جدیدی که پردازش میشود، بازنویسی میشود، در حالت بهروزرسانی، فقط خطوطی که برخی بهروزرسانیها در آنها رخ داده است، و پیوست همیشه یک خط جدید به انتها اضافه میکند.
پنجره زمان غلت - جمع آوری با استفاده از فواصل زمانی
در سیستمهای استریم، پیامها دو مهر زمانی متفاوت دارند: زمان رویداد - زمانی که پیام ایجاد شد، در مورد ما زمان خواندن حسگر، و زمان پردازش - زمانی که پیام توسط عامل پردازش خوانده میشود، در مورد ما وقتی به اسپارک می رسد.
یکی از ویژگیهای مهم ابزارهای پردازش جریانی، توانایی پردازش زمان رویداد است. پنجرههای پراکنده فواصل زمانی ثابتی هستند که با یکدیگر همپوشانی ندارند و برای ایجاد تجمیع با استفاده از ستونهای رویداد-زمان استفاده میشوند. به بیان ساده تر، آنها خط زمانی را به برش هایی با اندازه مساوی تقسیم می کنند تا هر رویداد به یک بازه واحد تعلق داشته باشد.
به عنوان مثال، هر 5 دقیقه شمارش کنید که در 5 دقیقه گذشته چند وسیله نقلیه شناسایی شده است.
کد زیر این موضوع را نشان می دهد:
این نوع پردازش می تواند در بسیاری از موقعیت ها بسیار مفید باشد. با بازگشت به آشکارساز ترافیکی که قبلاً پیشنهاد شد، یک روش ممکن این است که میانگین سرعت وسایل نقلیه را در یک پنجره 10 دقیقه اندازه گیری کنیم و ببینیم که آیا زیر یک آستانه مشخص است یا خیر.
پردازش زمان رویداد یک موضوع پیچیده است. هنگام مواجهه با آن همه چیز ممکن است اتفاق بیفتد، مانند گم شدن پیامها، دیر رسیدن یا از کار افتادن آنها. Spark مکانیسم های مختلفی برای کاهش مشکلات دارد، مانند متن، که ما روی آن تمرکز نخواهیم کرد.
پنجرههای زمانی را میتوان به همراه ستونهای دیگر در آن استفاده کرد دسته بندی بر اساس(). مثال زیر تعداد وسایل نقلیه را بر اساس نوع در یک پنجره 5 دقیقه ای شمارش می کند.
پنجره زمانی کشویی - انعطاف پذیری در فواصل زمانی
پنجره های زمانی کشویی انعطاف پذیری پنجره های غلتشی هستند. به جای ایجاد فواصل غیر همپوشانی، آنها اجازه میدهند هر بازه زمانی ایجاد شود.
به عنوان مثال، هر 5 دقیقه، شمارش کنید که در 30 دقیقه گذشته چند وسیله نقلیه شناسایی شده است.
به همین دلیل، رویدادها می توانند به فواصل زیادی تعلق داشته باشند و به تعداد دفعات مورد نیاز شمارش شوند.
برای تعریف یک پنجره کشویی، کافی است فاصله به روز رسانی را به پنجره منتقل کنید پنجره() عملکرد.
خروجی را ببینیم.
همانطور که می بینیم، ما هر 30 دقیقه 5 دقیقه پنجره ایجاد می کنیم.
این انعطافپذیری میتواند برای تعریف قوانین تجاری خاصتر و محرکهای پیچیدهتر بسیار مفید باشد. برای مثال، ردیاب ترافیک ما میتواند هر 5 ثانیه یک بار در 10 دقیقه گذشته پاسخهایی ارسال کند و زمانی که میانگین سرعت خودرو به زیر 20 کیلومتر در ساعت میرسد، هشدار ایجاد میکند.
این نگاهی سریع به مفاهیم اصلی جریان ساختاری جرقه و نحوه اعمال آنها با کافکا بود.
آپاچی کافکا و آپاچی اسپارک هر دو ابزارهای قابل اعتماد و قوی هستند که توسط بسیاری از شرکتها برای پردازش روزانه مقادیر باورنکردنی داده مورد استفاده قرار میگیرند و آنها را به یکی از قویترین جفتها در کار پردازش جریانی تبدیل میکنند.
ما یاد گرفتهایم که چگونه موضوعات کافکا را با استفاده از Spark Jobs پر کنیم، مصرف کنیم و پردازش کنیم. این کار سختی نبود، همانطور که در پست ذکر شد، API پردازش جریان تقریباً با API معمولی دسته ای برابر است، فقط با برخی تنظیمات جزئی.
ما همچنین در مورد حالتهای خروجی مختلف، چیزی خاص برای برنامههای استریم، و نحوه استفاده از هر یک بحث کردهایم. آخرین اما نه کماهمیت، ما انباشتهها را با پنجرههای زمانی، یکی از قابلیتهای اصلی پردازش جریان، بررسی کردیم.
باز هم، این یک نگاه سریع بود، و اگر میخواهید عمیقتر کاوش کنید، برخی از مراجع را در زیر میگذارم.
امیدوارم به نحوی کمکی کرده باشم، ممنون که خواندید! 🙂
تمام کدها در این موجود است مخزن GitHub.
داده های مورد استفاده - Contagens Volumétricas de Radares, داده های باز، فرماندار برزیل
[1] ویژگی Deep Dive: Watermarking در Apache Spark Structured Streaming - مکس فیشر در وبلاگ Databricks
[2] چمبرز، بی.، و زهاریا، ام. (2018). Spark: راهنمای قطعی: پردازش کلان داده ساده شده است. "O'Reilly Media, Inc."
[3] لجستیک، حمل و نقل و حمل و نقل در زمان واقعی با آپاچی کافکا- کای وانر
[4] با حضور آپاچی کافکا در استودیو نتفلیکس و دنیای مالی - وبلاگ همخوان
[5] اسپارک استریمینگ و کافکا — https://sparkbyexamples.com/
نگاهی سریع به جریان ساختار یافته Spark + کافکا بازنشر شده از منبع https://towardsdatascience.com/a-fast-look-at-spark-structured-streaming-kafka-f0ff64107325?source=rss—-7f60cf5620c9-4 via /towardsdatascience.com/feed
<!–
->
- بیت کوین
- bizbuildermike
- بلاکچین
- انطباق با بلاک چین
- کنفرانس بلاکچین
- مشاوران بلاک چین
- coinbase
- coingenius
- اجماع
- کنفرانس رمزنگاری
- معدنکاری رمز گشایی
- کریپتو کارنسی (رمز ارزها )
- غیر متمرکز
- DEFI
- دارایی های دیجیتال
- ethereum
- فراگیری ماشین
- رمز غیر قابل شستشو
- افلاطون
- افلاطون آی
- هوش داده افلاطون
- پلاتوبلاک چین
- PlatoData
- بازی پلاتو
- چند ضلعی
- اثبات سهام
- W3
- زفیرنت