מבט מהיר על Spark Structured Streaming + Kafka

לומדים את היסודות כיצד להשתמש בצמד העוצמתי הזה למשימות עיבוד זרמים

תמונה על ידי ניקיטה סינגהל on Unsplash

לאחרונה התחלתי ללמוד הרבה על Apache Kafka ו Apache Spark, שתי טכנולוגיות מובילות בעולם הנדסת הנתונים.

ביצעתי כמה פרויקטים באמצעותם בחודשים האחרונים; "למידת מכונה סטרימינג עם Kafka, Debezium ובנטוML" היא דוגמה. ההתמקדות שלי היא ללמוד כיצד ליצור צינורות נתונים רבי עוצמה עם הכלים המפורסמים המודרניים הללו ולקבל תחושה של היתרונות והחסרונות שלהם.

בחודשים האחרונים, כבר כיסיתי כיצד ליצור צינורות ETL באמצעות שני הכלים אך אף פעם לא באמצעותם יחד, וזה הפער שאמלא היום.

המטרה שלנו היא ללמוד את הרעיון הכללי מאחורי בניית אפליקציית סטרימינג עם Spark+Kafka ולתת מבט מהיר על המושגים העיקריים שלה באמצעות נתונים אמיתיים.

הרעיון פשוט - אפאצ'י קפקא הוא כלי להזרמת הודעות, שבו מפיקים כותבים הודעות בקצה אחד של תור (נקרא נושא) שייקרא על ידי הצרכנים מצד שני.

אבל זה כלי מאוד מורכב, שנבנה להיות שירות הודעות מבוזר עמיד, עם כל מיני ערבויות מסירה (בדיוק פעם אחת, פעם אחת, כל), אחסון הודעות ושכפול הודעות, תוך שהוא מאפשר גם גמישות, מדרגיות ותפוקה גבוהה. יש לו קבוצה רחבה יותר של מקרי שימוש, כמו תקשורת מיקרו-שירותים, מערכות אירועים בזמן אמת וזרימת צינורות ETL.

Apache Spark הוא מנוע לשינוי נתונים מבוסס זיכרון מבוזר.

זה גם כלי מאוד מורכב, שמסוגל להתחבר לכל מיני מסדי נתונים, מערכות קבצים ותשתיות ענן. היא מיועדת לפעול בסביבות מבוזרות כדי להקביל עיבוד בין מכונות, להשגת טרנספורמציות בעלות ביצועים גבוהים על ידי שימוש בפילוסופיית ההערכה העצלה שלה ובאופטימיזציות של שאילתות.

החלק המגניב בו הוא שבסופו של יום, הקוד הוא רק שאילתת SQL הרגילה שלך או (כמעט) סקריפט Python+Pandas שלך, כאשר כל הכישוף מופשט תחת ממשק API ברמה גבוהה ידידותי למשתמש.

הצטרף לשתי הטכנולוגיות הללו ויש לנו התאמה מושלמת לבנות צינור ETL ​​סטרימינג.

אנו נשתמש בנתונים מחיישני תנועה בעיר Belo Horizonte (BH), בירת Minas Gerais (ברזיל). זהו מערך נתונים ענק המכיל מדידות של זרימת התנועה במספר מקומות בעיר. כל חיישן מזהה מעת לעת את סוג הרכב הנוסע במיקום זה (מכונית, אופנוע, אוטובוס/משאית), מהירותו ואורכו (ומידע אחר שלא נשתמש בו).

מערך הנתונים הזה מייצג בדיוק את אחד מהיישומים הקלאסיים למערכות סטרימינג - קבוצה של חיישנים ששולחים את הקריאות שלהם ברציפות מהשטח.

בתרחיש זה, אפאצ'י קפקא יכול לשמש כשכבת הפשטה בין החיישנים ליישומים שצורכים את הנתונים שלהם.

קפקא משמש כשכבת הפשטה בין מקורות ושירותים. תמונה לפי מחבר.

עם סוג זה של תשתית, אפשר לבנות כל מיני (מה שנקרא) מערכות מונעות אירועים בזמן אמת, כמו תוכנית לזיהוי והתראה על פקקים כאשר מספר כלי הרכב גדל לפתע עם ירידה במהירות הממוצעת.

וכאן נכנס לתמונה אפאצ'י ספארק.

יש לו מודול מקורי לעיבוד זרם הנקרא Spark Structured Streaming, שיכול להתחבר לקפקא ולעבד את המסרים שלו.

הקמת הסביבה

כל מה שאתה צריך זה docker ו-docker-compose.

אנו נשתמש בתצורת קובץ docker-compose המבוססת על המאגרים הבאים: קישור ניצוץ, קישור קפקא.

השמיים ./src נפח הוא המקום שבו אנחנו הולכים לשים את התסריטים שלנו.

כדי להתחיל את הסביבה, פשוט רוץ

docker-compose למעלה

כל הקוד זמין בזה מאגר GitHub.

אחד הדברים שהכי אהבתי כשהתחלתי ללמוד את Spark היה הדמיון בין הקוד הכתוב עבורו לבין התסריטים הרגילים שלי של python+pandas. היה קל מאוד להגר.

בעקבות אותו היגיון, מודול הסטרימינג של Spark דומה מאוד לקוד הניצוץ הרגיל, מה שמקל על המעבר מיישומי האצווה לאלה של הזרמים.

עם זאת, בסעיפים הבאים, נתמקד בלימוד הספציפיות של סטרימינג מובנה של Spark, כלומר אילו תכונות חדשות יש לה.

העבודה הראשונה שלנו

בואו נתחיל לאט ונבנה דוגמה לצעצוע

הדבר הראשון שצריך לעשות הוא ליצור נושא קפקא שממנו הניצוץ שלנו יצרוך את המסרים.

הדבר נעשה על ידי גישה למסוף המכולות של קפקא וביצוע:

kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test_topic

כדי לדמות מפיק כותב הודעות בנושא זה, בואו נשתמש ב- קפקא-קונסולה-מפיק. גם בתוך המיכל:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_topic --property "parse.key=true" --property "key.separator=:"

מעתה, כל שורה שהוקלדה בטרמינל תישלח כהודעה לנושא הבדיקה. התו ":" משמש להפרדת המפתח והערך של ההודעה (מפתח:ערך).

בואו ניצור משרת Spark כדי לצרוך את הנושא הזה.

יש לשים את הקוד בתוך /src/streaming תיקייה (שום דבר מיוחד, רק התיקיה שבחרתי).

הדבר העיקרי שיש לציין הוא שאנו משתמשים בתכונות readStream ו writeStream, במקום קריאה וכתיבה רגילה. זה ההיבט העיקרי שגורם ל-Spark להתייחס לעבודה שלנו כאל אפליקציית סטרימינג.

כדי להתחבר לקפקא, יש צורך לציין את השרת והנושא. האפשרות starterOffsets=“המוקדם ביותר" אומר לספארק לקרוא את הנושא מההתחלה. כמו כן, כי קפקא מאחסן את המסרים שלו בינרי צורה, יש לפענח אותם מחרוזת.

האפשרויות האחרות ייבדקו עוד יותר.

כעת, בואו ניגש למיכל Spark ונפעיל את העבודה.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/read_test_stream.py

לאחר מספר שניות של תצורה, הוא יתחיל לצרוך את הנושא.

ניצוץ הודעות של קפקא. תמונה לפי מחבר.

Spark Streaming עובד ב מיקרו-אצווה מצב, וזו הסיבה שאנו רואים את מידע ה"אצווה" כאשר הוא צורך את ההודעות.

Micro-batching הוא קצת בין הזרמה "אמיתית" מלאה, שבה כל ההודעות מעובדות בנפרד כשהן מגיעות, לבין האצווה הרגילה, שבה הנתונים נשארים סטטיים ונצרכים לפי דרישה. Spark ימתין זמן מה בניסיון לצבור הודעות כדי לעבד אותן יחד, תוך הפחתת התקורה והגברת האחזור. זה יכול להיות מכוון לצרכים שלך.

אני לא טיפוס סופר מהיר, אז Spark מעבד את ההודעה לפני שאוכל לכלול חדשים באצווה הנוכחית.

וזו הייתה עבודת הסטרימינג הראשונה שלנו!

אני מקווה שהבנתם את ההרגשה: זה לא קשה לקודד עבודת עיבוד זרם, אבל יש כמה פתרונות.

כתיבת נתונים לזרם קפקא

עכשיו הגיע הזמן להתחיל לשחק עם נתוני החיישן.

ניתן להוריד את zip קובץ מאוגוסט 2022 וחלץ אותו לתוך /נתונים כרך. הנתונים הם במקור ב-JSON ולוקחים כ-23Gb של מקום. הדבר הראשון שצריך לעשות הוא להמיר אותו לפרקט כדי לייעל את שטח הדיסק וזמן הקריאה.

עבודות הניצוץ לעשות זאת מפורטות במאגר GitHub, כל מה שאתה צריך לעשות הוא לבצע אותן:

spark-submit /src/transform_json_to_parquet.pyspark-submit /src/join_parquet_files.py

בהתאם למכשיר שלך, הביצוע עשוי להימשך זמן מה. אבל זה משתלם, גודל קובץ הפרקט הסופי הוא ~1Gb (קטן יותר מפי 20) ומהיר הרבה יותר לקריאה.

אנחנו צריכים גם ליצור את נושא קפקא כדי לקבל את ההודעות שלנו:

kafka-topics.sh --create --replication-factor 1 --bootstrap-server localhost:9092 --topic traffic_sensor

לחלופין, אם אתה רוצה להציג את ההודעות המגיעות, אפשר להגדיר צרכן קונסולה.

kafka-console-consumer.sh --topic traffic_sensor --bootstrap-server localhost:9092

כתיבת נתונים על נושא קפקא היא קלה, אבל יש בה כמה פרטים.

בסטרימינג מובנה, התנהגות ברירת המחדל היא לא לנסות להסיק את סכימת הנתונים (עמודות וסוגיהן), אז אנחנו צריכים להעביר אחת.

הודעות קפקא הן רק צמדי מחרוזות בינאריות של מפתח-ערך, אז אנחנו צריכים לייצג את הנתונים שלנו בפורמט הזה. ניתן להשיג זאת בקלות על ידי המרת כל השורות למחרוזות JSON, קידודן בבינארי ואחסון התוצאה בעמודת "ערך".

הפיכת עמודות למחרוזות JSON. תמונה לפי מחבר.

מפתחות הודעות חשובים מאוד בקפקא, אבל הם לא יהיו שימושיים במבחנים שלנו, אז לכל ההודעות יהיה אותו דבר.

כפי שצוין קודם, מערך הנתונים הזה הוא עצום, אז הגבלתי את מספר ההודעות שהוכנסו ל-500,000.

לבסוף, אנו עוברים את השרת והנושא של קפקא ו"מחסום מיקום" שבו הניצוץ יאחסן את התקדמות הביצוע, שימושי להתאוששות משגיאות.

ביצוע העבודה:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/insert_traffic_topic.py
הכנסת נתונים לקפקא. תמונה לפי מחבר.

משמאל, עבודת Spark קורא את הקובץ, מימין, א קפקא-קונסולה-צרכן מציג את ההודעות המגיעות.

נושא התנועה שלנו מאוכלס וכמעט מוכן לעיבוד.

חשוב לזכור שהשתמשנו בעבודת ניצוץ כדי לאכלס את הנושא שלנו רק למטרות למידה. בתרחיש אמיתי, החיישנים עצמם ישלחו קריאות ישירות לקפקא.

כדי לדמות התנהגות דינמית זו, התסריט למטה כותב שורה אחת לנושא כל 1 שניות.

מצבי פלט - ספירת מספר כלי הרכב לפי סוג

ממשיכים הלאה, בואו ניצור עבודה כדי לספור את מספר כלי הרכב לפי סוג.

העמודה "Classificação" (סיווג) מכילה את סוג הרכב שזוהה.

בזמן שאנו קוראים מהנושא, עלינו להמיר את המחרוזות הבינאריות של JSON בחזרה לפורמט העמודות.

ברגע שזה נעשה, ניתן לבנות את השאילתה כרגיל. מעניין לציין כי לב השאילתה הוא רק ה בחר,groupBy,לספור() רצף, כל השאר הוא ביחס להיגיון של סטרימינג.

אז הגיע הזמן להתייחס ל outputMode() אפשרות.

מצב הפלט של אפליקציית זרם מציין כיצד אנו רוצים לחשב (מחדש) ולכתוב את התוצאות כאשר נתונים חדשים מגיעים.

זה יכול להניח שלושה ערכים שונים:

  • צרף: הוסף רק רשומות חדשות לפלט.
  • להשלים: חשב מחדש את התוצאה המלאה עבור כל שיא חדש.
  • עדכון: עדכן רשומות שהשתנו.

מצבים אלו יכולים להיות הגיוניים או לא יכולים להיות הגיוניים בהתאם ליישום שנכתב. לדוגמה, מצב "השלם" עשוי להיות לא הגיוני אם מבוצע קיבוץ או מיון כלשהו.

בואו נבצע את העבודה במצב "שלם" ונסתכל על התוצאות.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/group_by_vehicle_type.py
Caminhão - משאית, Automóvel-Car, Indefinido-Undefined, Ônibus-Bus, Moto-Motocycle. תמונה לפי מחבר.

כאשר רשומות חדשות מוכנסות לזרם (ראה הטרמינל בצד ימין), העבודה מחשבת מחדש את התוצאה המלאה. זה יכול להיות שימושי במצבים שבהם סדר השורות חשוב, כמו דירוג או תחרות.

עם זאת, ייתכן שגישה זו לא תהיה אופטימלית אם מספר הקבוצות גדול מדי או שהשינויים האישיים אינם משפיעים על התוצאה הכוללת.

אז, אפשרות נוספת היא להשתמש במצב הפלט "עדכון", שיוצר הודעה חדשה רק עבור הקבוצות שהשתנו. ראה למטה:

השאילתה עם מצב פלט "עדכון". תמונה לפי מחבר.

מצב "הוסף" אינו זמין לשאילתות עם קיבוץ, כך שלא אוכל להראות באמצעות אותה עבודה. אבל אני חושב שזה המצב הפשוט ביותר, זה תמיד מוסיף שיא חדש לפלט.

מצבי פלט אלה פשוטים יותר להבנה אם אתה חושב על שמירת התוצאות בטבלה. במצב הפלט השלם, הטבלה תישכתב מחדש עבור כל הודעה חדשה שתעובד, במצב העדכון, רק השורות שבהן התרחש עדכון כלשהו, ​​והתוספת תמיד תוסיף שורה חדשה לסוף.

חלון זמן נפילה - צבירה באמצעות מרווחי זמן

במערכות סטרימינג, להודעות יש שתי חותמות זמן שונות הקשורות אליהן: זמן אירוע — הזמן שבו נוצרה ההודעה, במקרה שלנו זמן קריאת החיישן, וזמן עיבוד — כאשר ההודעה נקראת על ידי סוכן העיבוד, במקרה שלנו כאשר זה מגיע לספארק.

תכונה חשובה של כלי עיבוד זרמים היא היכולת לטפל בעיבוד זמן אירועים. חלונות נפילה הם מרווחי זמן קבועים שאינם חופפים המשמשים ליצירת צבירה באמצעות עמודות זמן אירועים. במילים פשוטות יותר, הם פורסים את ציר הזמן לפרוסות בגודל שווה כך שכל אירוע שייך למרווח בודד.

לדוגמה, ספור, כל 5 דקות, כמה כלי רכב זוהו ב-5 הדקות האחרונות.

חלון נפילה של 5 דקות. תמונה לפי מחבר.

הקוד שלהלן ממחיש זאת:

סוג זה של עיבוד יכול להיות שימושי ביותר במצבים רבים. אם נחזור לגלאי הפקקים שהוצע קודם לכן, גישה אפשרית אחת היא למדוד את המהירות הממוצעת של כלי הרכב בחלון של 10 דקות ולראות אם היא מתחת לסף מסוים.

עיבוד בזמן אירועים הוא נושא מורכב. הכל יכול לקרות כאשר מתמודדים עם זה, כמו הודעות שהולכות לאיבוד, להגיע מאוחר מדי או להתקלקל. ל-Spark יש מספר מנגנונים כדי לנסות למתן את הבעיות, כמו סימני מים, שלא נתמקד בהם.

ניתן להשתמש בחלונות זמן גם יחד עם עמודות אחרות ב- groupBy(). הדוגמה שלהלן סופרת את מספר כלי הרכב לפי סוג בחלון של 5 דקות.

חלון זמן הזזה - גמישות במרווחי הזמן

חלונות זמן הזזה הם גמישות של חלונות נפולים. במקום ליצור מרווחים שאינם חופפים, הם מאפשרים להגדיר באיזו תדירות כל מרווח ייווצר.

לדוגמה, כל 5 דקות, ספור כמה כלי רכב זוהו ב-30 הדקות האחרונות.

בגלל זה, אירועים יכולים להיות שייכים למרווחים רבים ולהיספר כמה פעמים שצריך.

כדי להגדיר חלון הזזה, פשוט העבירו את מרווח העדכון ל- חלון() פונקציה.

בוא נראה את הפלט.

כפי שאנו יכולים לראות, נוצרים לנו חלונות של 30 דקות בכל 5 דקות.

גמישות זו יכולה להיות שימושית למדי כדי להגדיר כללים עסקיים ספציפיים יותר וטריגרים מורכבים יותר. לדוגמה, גלאי הפקקים שלנו יכול לשלוח תגובות כל 5 שניות בערך ב-10 הדקות האחרונות וליצור התראה כאשר מהירות המכונית הממוצעת יורדת מתחת ל-20 קמ"ש.

זה היה מבט מהיר על המושגים העיקריים של Spark Structured Streaming וכיצד ניתן ליישם אותם עם קפקא.

Apache Kafka ו- Apache Spark הם שניהם כלים אמינים וחסונים המשמשים חברות רבות לעיבוד כמויות מדהימות של נתונים, מה שהופך אותם לאחד הזוגות החזקים ביותר במשימת עיבוד הזרם.

למדנו כיצד לאכלס, לצרוך ולעבד נושאי קפקא באמצעות משרות Spark. זו לא הייתה משימה קשה, כפי שהוזכר בפוסט, ממשק ה-API לעיבוד הזרם כמעט שווה ל-API האצווה הרגיל, עם כמה התאמות קטנות בלבד.

דנו גם במצבי פלט שונים, משהו ספציפי ליישומי סטרימינג, וכיצד ניתן להשתמש בכל אחד מהם. אחרון חביב, חקרנו אגרגציות עם חלונות זמן, אחת היכולות העיקריות של עיבוד זרם.

שוב, זה היה מבט מהיר, ואני אשאיר כמה הפניות למטה אם אתה רוצה לחקור לעומק.

מקווה שעזרתי איכשהו, תודה שקראתם! 🙂

כל הקוד זמין בזה מאגר GitHub.
נתונים בשימוש -
Contagens Volumétricas de Radares, נתוני פתיחה, מושל ברזיל

[1] תכונות צלילה עמוקות: סימון מים בסטרימינג מובנה של Apache Spark - מקס פישר בבלוג Databricks
[2] Chambers, B., & Zaharia, M. (2018). Spark: המדריך המובהק: עיבוד נתונים גדולים נעשה פשוט. "O'Reilly Media, Inc.".
[3] לוגיסטיקה, משלוח ותחבורה בזמן אמת עם אפאצ'י קפקא- קאי ואהנר
[4] בהשתתפות אפאצ'י קפקא ב-Netflix Studio ו-Finance World - בלוג קונפלואנט
[5] Spark Streaming וקפקא - https://sparkbyexamples.com/

מבט מהיר על Spark Structured Streaming + Kafka פורסם מחדש ממקור https://towardsdatascience.com/a-fast-look-at-spark-structured-streaming-kafka-f0ff64107325?source=rss—-7f60cf5620c9—4 דרך https:/ /towardsdatascience.com/feed

<!–

->

בול זמן:

עוד מ יועצי בלוקצ'יין