نگاهی سریع به جریان ساختار یافته اسپارک + کافکا

یادگیری اصول اولیه نحوه استفاده از این دوتایی قدرتمند برای کارهای پردازش جریان

عکس نیکیتا سینگال on می Unsplash

اخیراً شروع به مطالعه زیادی در مورد آپاچی کافکا و آپاچی اسپارک، دو فناوری پیشرو در دنیای مهندسی داده کردم.

من چندین پروژه با استفاده از آنها در چند ماه گذشته ساخته ام. "یادگیری ماشین جریانی با کافکا، دبیزیوم و بنتوML” یک نمونه است. تمرکز من این است که یاد بگیرم چگونه خطوط لوله داده قدرتمندی را با این ابزارهای معروف مدرن ایجاد کنم و مزایا و معایب آنها را درک کنم.

در ماه‌های گذشته، من قبلاً نحوه ایجاد خطوط لوله ETL را با استفاده از هر دو ابزار اما هرگز با هم استفاده نکردم، و این شکافی است که امروز پر می‌کنم.

هدف ما یادگیری ایده کلی در پشت ساخت یک برنامه استریم با اسپارک + کافکا و نگاهی سریع به مفاهیم اصلی آن با استفاده از داده های واقعی است.

ایده ساده است - آپاچی کافکا یک ابزار پخش پیام است که در آن تولیدکنندگان پیام ها را در یک انتهای صف می نویسند (به نام موضوع) از طرف دیگر توسط مصرف کنندگان خوانده شود.

اما این یک ابزار بسیار پیچیده است که به عنوان یک سرویس پیام رسانی توزیع شده انعطاف پذیر ساخته شده است، با انواع ضمانت های تحویل (دقیقا یک بار، یک بار، هر بار)، ذخیره پیام و تکرار پیام، در حالی که انعطاف پذیری، مقیاس پذیری و توان عملیاتی بالا را نیز فراهم می کند. دارای مجموعه وسیع تری از موارد استفاده، مانند ارتباطات میکروسرویس، سیستم های رویداد بلادرنگ، و جریان خطوط لوله ETL.

Apache Spark یک موتور تبدیل داده مبتنی بر حافظه توزیع شده است.

همچنین ابزار بسیار پیچیده ای است که می تواند با انواع پایگاه های داده، سیستم های فایل و زیرساخت های ابری ارتباط برقرار کند. این برای کار در محیط های توزیع شده برای موازی سازی پردازش بین ماشین ها، دستیابی به دگرگونی های با کارایی بالا با استفاده از فلسفه ارزیابی تنبل و بهینه سازی پرس و جو طراحی شده است.

نکته جالب در مورد آن این است که، تا پایان روز، کد فقط پرس و جوی SQL معمول شما یا (تقریبا) اسکریپت پایتون+پاندا شما است، با تمام جادوگری که تحت یک API سطح بالا و کاربرپسند خوب انتزاع شده است.

به این دو فناوری بپیوندید و ما یک تطابق کامل برای ایجاد خط لوله جریان ETL داریم.

ما از داده های حسگرهای ترافیک در شهر بلو هوریزونته (BH)، پایتخت میناس گرایس (برزیل) استفاده خواهیم کرد. این مجموعه داده عظیمی است که شامل اندازه گیری جریان ترافیک در چندین مکان در شهر است. هر سنسور به طور دوره ای نوع وسیله نقلیه رانده شده در آن مکان (ماشین، موتور سیکلت، اتوبوس/کامیون)، سرعت و طول آن (و سایر اطلاعاتی که ما از آنها استفاده نمی کنیم) را تشخیص می دهد.

این مجموعه داده دقیقاً یکی از کاربردهای کلاسیک برای سیستم‌های استریم را نشان می‌دهد - گروهی از حسگرها که قرائت‌های خود را به طور مداوم از میدان ارسال می‌کنند.

در این سناریو، آپاچی کافکا می تواند به عنوان یک لایه انتزاعی بین سنسورها و برنامه هایی که داده های آنها را مصرف می کنند، استفاده شود.

کافکا به عنوان یک لایه انتزاعی بین منابع و خدمات استفاده می شود. تصویر توسط نویسنده

با این نوع زیرساخت، امکان ساخت انواع (به اصطلاح) وجود دارد. سیستم های رویداد محور در زمان واقعی، مانند برنامه ای برای شناسایی و هشدار برای راه بندان در زمانی که تعداد وسایل نقلیه به طور ناگهانی با کاهش میانگین سرعت افزایش می یابد.

و اینجاست که آپاچی اسپارک وارد عمل می شود.

دارای یک ماژول بومی برای پردازش جریان به نام جریان ساختار یافته جرقه، که می تواند به کافکا متصل شود و پیام های آن را پردازش کند.

راه اندازی محیط

تنها چیزی که نیاز دارید docker و docker-compose است.

ما از یک پیکربندی فایل docker-compose بر اساس مخازن زیر استفاده خواهیم کرد: جرقه پیوند, لینک کافکا.

La ./src حجم جایی است که ما اسکریپت های خود را قرار می دهیم.

برای شروع محیط کافیست اجرا کنید

docker-compose

تمام کدها در این موجود است مخزن GitHub.

یکی از چیزهایی که هنگام شروع مطالعه Spark بیشتر دوست داشتم، شباهت کدهای نوشته شده برای آن و اسکریپت های معمول پایتون + پانداهای من بود. مهاجرت خیلی راحت بود.

با پیروی از همین منطق، ماژول استریم اسپارک بسیار شبیه به کد جرقه معمولی است و مهاجرت از برنامه های دسته ای به برنامه های استریم را آسان می کند.

با این اوصاف، در بخش‌های بعدی، ما بر روی یادگیری ویژگی‌های استریم ساختار یافته Spark تمرکز خواهیم کرد، یعنی اینکه چه ویژگی‌های جدیدی دارد.

اولین کار ما

بیایید آهسته شروع کنیم و یک نمونه اسباب بازی بسازیم

اولین کاری که باید انجام دهید این است که یک موضوع کافکا ایجاد کنید که از آنجا کار جرقه ما پیام ها را مصرف کند.

این کار توسط دسترسی به ترمینال کانتینری کافکا و اجرا کردن:

kafka-topics.sh --ایجاد --bootstrap-server localhost:9092 --topic test_topic

برای شبیه سازی تهیه کننده ای که در مورد این موضوع پیام می نویسد، از آن استفاده می کنیم کافکا-کنسول-تولیدکننده. همچنین داخل ظرف:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_topic --property "parse.key=true" --property "key.separator=:"

از این پس هر خطی که در ترمینال تایپ شود به عنوان یک پیام به موضوع آزمون ارسال می شود. کاراکتر ":" برای جدا کردن کلید و مقدار پیام (key:value) استفاده می شود.

بیایید یک Spark job برای مصرف این موضوع ایجاد کنیم.

کد باید در داخل قرار داده شود /src/streaming پوشه (هیچ چیز خاصی نیست، فقط پوشه ای که من انتخاب کردم).

نکته کلیدی که باید توجه داشت این است که ما از ویژگی ها استفاده می کنیم readStream و writeStream، به جای خواندن و نوشتن معمولی این جنبه اصلی است که باعث می شود Spark با کار ما به عنوان یک برنامه استریم برخورد کند.

برای اتصال به کافکا باید سرور و موضوع را مشخص کنید. گزینه startingOffsets=“Earliest” به اسپارک می گوید که موضوع را از ابتدا بخواند. همچنین، زیرا کافکا پیام های خود را در آن ذخیره می کند دوتایی شکل، آنها باید رمزگشایی شوند رشته.

گزینه های دیگر بیشتر مورد بررسی قرار خواهند گرفت.

حالا بیایید به ظرف Spark دسترسی پیدا کنیم و کار را اجرا کنیم.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/read_test_stream.py

پس از چند ثانیه پیکربندی، شروع به مصرف موضوع می کند.

جرقه پیام های کافکا. تصویر توسط نویسنده

Spark Streaming در کار می کند میکرو بچینگ حالت، و به همین دلیل است که وقتی پیام‌ها را مصرف می‌کند، اطلاعات «دسته‌ای» را می‌بینیم.

Micro-batching تا حدودی بین استریم کامل «واقعی» است، که در آن همه پیام‌ها به صورت جداگانه پردازش می‌شوند، و دسته‌ای معمول، که در آن داده‌ها ثابت می‌مانند و بر حسب تقاضا مصرف می‌شوند. Spark مدتی صبر می کند تا پیام ها را جمع آوری کند تا آنها را با هم پردازش کند و هزینه های اضافی را کاهش دهد و تاخیر را افزایش دهد. این را می توان با نیازهای شما تنظیم کرد.

من تایپ‌کننده فوق‌العاده سریع نیستم، بنابراین Spark قبل از اینکه بتوانم پیام‌های جدید را در دسته فعلی اضافه کنم، پیام را پردازش می‌کند.

و این اولین کار پخش جریانی ما بود!

امیدوارم این احساس را داشته باشید: کدنویسی یک کار پردازش جریانی کار سختی نیست، اما مشکلاتی وجود دارد.

نوشتن داده در جریان کافکا

اکنون زمان شروع بازی با داده های سنسور است.

شما می توانید دانلود کنید زیپ فایل از آگوست 2022 و استخراج آن در /داده ها جلد. داده ها در اصل JSON هستند و حدود 23 گیگابایت فضا می گیرند. اولین کاری که باید انجام دهید این است که آن را به پارکت تبدیل کنید تا فضای دیسک و زمان خواندن بهینه شود.

کارهای spark برای انجام این کار در مخزن GitHub به تفصیل آمده است، تنها کاری که باید انجام دهید این است که آنها را اجرا کنید:

spark-submit /src/transform_json_to_parquet.pyspark-submit /src/join_parquet_files.py

بسته به دستگاه شما، اجرا ممکن است کمی طول بکشد. اما به درد می خورد، اندازه فایل پارکت نهایی ~ 1 گیگابیت است (بیش از 20 برابر کوچکتر) و خواندن آن بسیار سریعتر است.

ما همچنین باید موضوع کافکا را برای دریافت پیام های خود ایجاد کنیم:

kafka-topics.sh --ایجاد --replication-factor 1 --bootstrap-server localhost:9092 --topic traffic_sensor

در صورت تمایل، اگر می‌خواهید پیام‌های دریافتی را نمایش دهید، می‌توانید یک مصرف‌کننده کنسول راه‌اندازی کنید.

kafka-console-consumer.sh --topic traffic_sensor --bootstrap-server localhost:9092

نوشتن داده در مورد یک موضوع کافکا آسان است، اما دارای برخی جزئیات است.

در جریان ساخت‌یافته، رفتار پیش‌فرض این است که سعی نکنید طرحواره داده (ستون‌ها و انواع آن‌ها) را استنتاج کنیم، بنابراین باید یکی را پاس کنیم.

پیام های کافکا فقط جفت رشته های دودویی کلید-مقدار هستند، بنابراین باید داده های خود را در این قالب نمایش دهیم. با تبدیل همه ردیف‌ها به رشته‌های JSON، کدگذاری آن‌ها به صورت باینری و ذخیره نتیجه در ستون «مقدار» به راحتی می‌توان به این امر دست یافت.

تبدیل ستون ها به رشته های JSON. تصویر توسط نویسنده

کلیدهای پیام در کافکا بسیار مهم هستند، اما در تست های ما کاربردی نخواهند داشت، بنابراین همه پیام ها یکسان خواهند داشت.

همانطور که قبلا ذکر شد، این مجموعه داده عظیم است، بنابراین تعداد پیام های درج شده را به 500,000 محدود کردم.

در نهایت سرور کافکا و موضوع و یک «مکان بازرسیجایی که جرقه پیشرفت اجرا را ذخیره می‌کند و برای بازیابی خطاها مفید است.

اجرای کار:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/insert_traffic_topic.py
درج داده ها در کافکا تصویر توسط نویسنده

در سمت چپ، کار Spark فایل را می خواند، در سمت راست، a کافکا-کنسول-مصرف کننده پیام های رسیده را نمایش می دهد.

موضوع ترافیک ما پر شده و تقریباً آماده پردازش است.

مهم است که به یاد داشته باشید که ما از یک spark job برای پر کردن موضوع خود فقط برای اهداف یادگیری استفاده کردیم. در یک سناریوی واقعی، خود حسگرها خوانش‌ها را مستقیماً برای کافکا ارسال می‌کنند.

برای شبیه سازی این رفتار پویا، اسکریپت زیر هر 1 ثانیه یک ردیف به موضوع می نویسد.

حالت های خروجی - شمارش تعداد وسایل نقلیه بر اساس نوع

در ادامه، بیایید شغلی ایجاد کنیم تا تعداد وسایل نقلیه را بر اساس نوع بشماریم.

ستون "Classificação" (طبقه بندی) حاوی نوع وسیله نقلیه شناسایی شده است.

همانطور که از موضوع می خوانیم، باید رشته های باینری JSON را به فرمت ستونی برگردانیم.

پس از انجام این کار، پرس و جو می تواند به طور معمول ساخته شود. جالب است بدانید که قلب پرس و جو فقط همان است را انتخاب کنید()دسته بندی بر اساس()تعداد دفعات مشاهده() دنباله، بقیه نسبت به منطق جریان است.

پس وقت آن است که به آن بپردازیم حالت خروجی() گزینه.

حالت خروجی یک برنامه استریم مشخص می کند که چگونه می خواهیم نتایج را با رسیدن داده های جدید (دوباره) محاسبه و بنویسیم.

می تواند سه مقدار متفاوت را در نظر بگیرد:

  • ضمیمه کردن: فقط رکوردهای جدید را به خروجی اضافه کنید.
  • تکمیل: نتیجه کامل را برای هر رکورد جدید دوباره محاسبه کنید.
  • بروزرسانی: سوابق تغییر یافته را به روز کنید.

این حالت ها بسته به برنامه نوشته شده می توانند یا نمی توانند معنی داشته باشند. به عنوان مثال، اگر گروه بندی یا مرتب سازی انجام شود، حالت "کامل" ممکن است معنی نداشته باشد.

بیایید کار را در حالت "کامل" اجرا کنیم و به نتایج نگاه کنیم.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/group_by_vehicle_type.py
Caminhão - کامیون، خودرو-ماشین، نامشخص-نامشخص، اتوبوس-اتوبوس، موتور-موتوسیکلت. تصویر توسط نویسنده

همانطور که رکوردهای جدید در جریان درج می شوند (ترمینال را در سمت راست ببینید)، کار نتیجه کامل را دوباره محاسبه می کند. این می تواند در شرایطی که ترتیب ردیف مهم است، مانند رتبه بندی یا رقابت، مفید باشد.

با این حال، اگر تعداد گروه ها خیلی زیاد باشد یا تغییرات فردی بر نتیجه کلی تأثیر نگذارد، این رویکرد ممکن است بهینه نباشد.

بنابراین، گزینه دیگر استفاده از حالت خروجی "به روز رسانی" است که فقط برای گروه هایی که تغییر کرده اند یک پیام جدید ایجاد می کند. زیر را ببینید:

پرس و جو با حالت خروجی "به روز رسانی". تصویر توسط نویسنده

حالت "پیوست" برای جستجوهای دارای گروه بندی در دسترس نیست، بنابراین من نمی توانم با استفاده از همان کار نشان دهم. اما من فکر می کنم که این ساده ترین حالت، آن است همیشه یک رکورد جدید به خروجی اضافه می کند.

اگر به ذخیره نتایج در جدول فکر می کنید، درک این حالت های خروجی ساده تر است. در حالت خروجی کامل، جدول برای هر پیام جدیدی که پردازش می‌شود، بازنویسی می‌شود، در حالت به‌روزرسانی، فقط خطوطی که برخی به‌روزرسانی‌ها در آن‌ها رخ داده است، و پیوست همیشه یک خط جدید به انتها اضافه می‌کند.

پنجره زمان غلت - جمع آوری با استفاده از فواصل زمانی

در سیستم‌های استریم، پیام‌ها دو مهر زمانی متفاوت دارند: زمان رویداد - زمانی که پیام ایجاد شد، در مورد ما زمان خواندن حسگر، و زمان پردازش - زمانی که پیام توسط عامل پردازش خوانده می‌شود، در مورد ما وقتی به اسپارک می رسد.

یکی از ویژگی‌های مهم ابزارهای پردازش جریانی، توانایی پردازش زمان رویداد است. پنجره‌های پراکنده فواصل زمانی ثابتی هستند که با یکدیگر همپوشانی ندارند و برای ایجاد تجمیع با استفاده از ستون‌های رویداد-زمان استفاده می‌شوند. به بیان ساده تر، آنها خط زمانی را به برش هایی با اندازه مساوی تقسیم می کنند تا هر رویداد به یک بازه واحد تعلق داشته باشد.

به عنوان مثال، هر 5 دقیقه شمارش کنید که در 5 دقیقه گذشته چند وسیله نقلیه شناسایی شده است.

پنجره غلتشی 5 دقیقه تصویر توسط نویسنده

کد زیر این موضوع را نشان می دهد:

این نوع پردازش می تواند در بسیاری از موقعیت ها بسیار مفید باشد. با بازگشت به آشکارساز ترافیکی که قبلاً پیشنهاد شد، یک روش ممکن این است که میانگین سرعت وسایل نقلیه را در یک پنجره 10 دقیقه اندازه گیری کنیم و ببینیم که آیا زیر یک آستانه مشخص است یا خیر.

پردازش زمان رویداد یک موضوع پیچیده است. هنگام مواجهه با آن همه چیز ممکن است اتفاق بیفتد، مانند گم شدن پیام‌ها، دیر رسیدن یا از کار افتادن آن‌ها. Spark مکانیسم های مختلفی برای کاهش مشکلات دارد، مانند متن، که ما روی آن تمرکز نخواهیم کرد.

پنجره‌های زمانی را می‌توان به همراه ستون‌های دیگر در آن استفاده کرد دسته بندی بر اساس(). مثال زیر تعداد وسایل نقلیه را بر اساس نوع در یک پنجره 5 دقیقه ای شمارش می کند.

پنجره زمانی کشویی - انعطاف پذیری در فواصل زمانی

پنجره های زمانی کشویی انعطاف پذیری پنجره های غلتشی هستند. به جای ایجاد فواصل غیر همپوشانی، آنها اجازه می‌دهند هر بازه زمانی ایجاد شود.

به عنوان مثال، هر 5 دقیقه، شمارش کنید که در 30 دقیقه گذشته چند وسیله نقلیه شناسایی شده است.

به همین دلیل، رویدادها می توانند به فواصل زیادی تعلق داشته باشند و به تعداد دفعات مورد نیاز شمارش شوند.

برای تعریف یک پنجره کشویی، کافی است فاصله به روز رسانی را به پنجره منتقل کنید پنجره() عملکرد.

خروجی را ببینیم.

همانطور که می بینیم، ما هر 30 دقیقه 5 دقیقه پنجره ایجاد می کنیم.

این انعطاف‌پذیری می‌تواند برای تعریف قوانین تجاری خاص‌تر و محرک‌های پیچیده‌تر بسیار مفید باشد. برای مثال، ردیاب ترافیک ما می‌تواند هر 5 ثانیه یک بار در 10 دقیقه گذشته پاسخ‌هایی ارسال کند و زمانی که میانگین سرعت خودرو به زیر 20 کیلومتر در ساعت می‌رسد، هشدار ایجاد می‌کند.

این نگاهی سریع به مفاهیم اصلی جریان ساختاری جرقه و نحوه اعمال آنها با کافکا بود.

آپاچی کافکا و آپاچی اسپارک هر دو ابزارهای قابل اعتماد و قوی هستند که توسط بسیاری از شرکت‌ها برای پردازش روزانه مقادیر باورنکردنی داده مورد استفاده قرار می‌گیرند و آنها را به یکی از قوی‌ترین جفت‌ها در کار پردازش جریانی تبدیل می‌کنند.

ما یاد گرفته‌ایم که چگونه موضوعات کافکا را با استفاده از Spark Jobs پر کنیم، مصرف کنیم و پردازش کنیم. این کار سختی نبود، همانطور که در پست ذکر شد، API پردازش جریان تقریباً با API معمولی دسته ای برابر است، فقط با برخی تنظیمات جزئی.

ما همچنین در مورد حالت‌های خروجی مختلف، چیزی خاص برای برنامه‌های استریم، و نحوه استفاده از هر یک بحث کرده‌ایم. آخرین اما نه کم‌اهمیت، ما انباشته‌ها را با پنجره‌های زمانی، یکی از قابلیت‌های اصلی پردازش جریان، بررسی کردیم.

باز هم، این یک نگاه سریع بود، و اگر می‌خواهید عمیق‌تر کاوش کنید، برخی از مراجع را در زیر می‌گذارم.

امیدوارم به نحوی کمکی کرده باشم، ممنون که خواندید! 🙂

تمام کدها در این موجود است مخزن GitHub.
داده های مورد استفاده -
Contagens Volumétricas de Radares, داده های باز، فرماندار برزیل

[1] ویژگی Deep Dive: Watermarking در Apache Spark Structured Streaming - مکس فیشر در وبلاگ Databricks
[2] چمبرز، بی.، و زهاریا، ام. (2018). Spark: راهنمای قطعی: پردازش کلان داده ساده شده است. "O'Reilly Media, Inc."
[3] لجستیک، حمل و نقل و حمل و نقل در زمان واقعی با آپاچی کافکا- کای وانر
[4] با حضور آپاچی کافکا در استودیو نتفلیکس و دنیای مالی - وبلاگ همخوان
[5] اسپارک استریمینگ و کافکا — https://sparkbyexamples.com/

نگاهی سریع به جریان ساختار یافته Spark + کافکا بازنشر شده از منبع https://towardsdatascience.com/a-fast-look-at-spark-structured-streaming-kafka-f0ff64107325?source=rss—-7f60cf5620c9-4 via /towardsdatascience.com/feed

<!–

->

تمبر زمان:

بیشتر از مشاوران بلاک چین