Nhìn nhanh về truyền phát có cấu trúc Spark + Kafka

Tìm hiểu những kiến ​​thức cơ bản về cách sử dụng bộ đôi mạnh mẽ này cho các tác vụ xử lý luồng

Photo by Nikhita Singhal on Unsplash

Gần đây tôi bắt đầu nghiên cứu rất nhiều về Apache Kafka và Apache Spark, hai công nghệ hàng đầu trong thế giới kỹ thuật dữ liệu.

Tôi đã thực hiện một số dự án sử dụng chúng trong vài tháng qua; “Truyền trực tuyến học máy với Kafka, Debezium và BentoML” là một ví dụ. Trọng tâm của tôi là tìm hiểu cách tạo các đường dẫn dữ liệu mạnh mẽ bằng các công cụ nổi tiếng hiện đại này và hiểu được ưu điểm và nhược điểm của chúng.

Trong những tháng vừa qua, tôi đã trình bày cách tạo quy trình ETL bằng cả hai công cụ nhưng chưa bao giờ sử dụng chúng cùng nhau và đó là khoảng trống mà tôi sẽ lấp đầy hôm nay.

Mục tiêu của chúng tôi là tìm hiểu ý tưởng chung đằng sau việc xây dựng một ứng dụng phát trực tuyến với Spark+Kafka và xem nhanh các khái niệm chính của nó bằng cách sử dụng dữ liệu thực.

Ý tưởng rất đơn giản - Apache Kafka là một công cụ truyền phát tin nhắn, trong đó các nhà sản xuất viết tin nhắn ở một đầu của hàng đợi (được gọi là chủ đề) để người tiêu dùng đọc ở mặt khác.

Nhưng đó là một công cụ rất phức tạp, được xây dựng để trở thành một dịch vụ nhắn tin phân tán linh hoạt, với tất cả các loại đảm bảo gửi (chính xác một lần, một lần, bất kỳ), lưu trữ tin nhắn và sao chép tin nhắn, đồng thời cho phép tính linh hoạt, khả năng mở rộng và thông lượng cao. Nó có nhiều trường hợp sử dụng hơn, như giao tiếp vi dịch vụ, hệ thống sự kiện thời gian thực và quy trình phát trực tuyến ETL.

Apache Spark là một công cụ chuyển đổi dữ liệu dựa trên bộ nhớ phân tán.

Nó cũng là một công cụ rất phức tạp, có thể kết nối với tất cả các loại cơ sở dữ liệu, hệ thống tệp và cơ sở hạ tầng đám mây. Nó được thiết kế để hoạt động trong môi trường phân tán nhằm xử lý song song giữa các máy, đạt được các chuyển đổi hiệu suất cao bằng cách sử dụng triết lý đánh giá lười biếng và tối ưu hóa truy vấn.

Điều thú vị ở đây là, đến cuối ngày, mã chỉ là truy vấn SQL thông thường của bạn hoặc (gần như) tập lệnh Python+pandas của bạn, với tất cả các phép thuật phù thủy được trừu tượng hóa dưới một API cấp cao thân thiện với người dùng.

Kết hợp hai công nghệ này và chúng tôi có sự kết hợp hoàn hảo để xây dựng quy trình phát trực tuyến ETL.

Chúng tôi sẽ sử dụng dữ liệu từ các cảm biến giao thông ở thành phố Belo Horizonte (BH), thủ đô của Minas Gerais (Brazil). Đó là một tập dữ liệu khổng lồ chứa các phép đo lưu lượng giao thông ở một số nơi trong thành phố. Mỗi cảm biến sẽ phát hiện định kỳ loại phương tiện đang di chuyển tại địa điểm đó (ô tô, xe máy, xe buýt/xe tải), tốc độ và chiều dài của phương tiện đó (và các thông tin khác mà chúng tôi sẽ không sử dụng).

Tập dữ liệu này thể hiện chính xác một trong những ứng dụng cổ điển dành cho hệ thống phát trực tuyến - một nhóm cảm biến gửi kết quả đọc liên tục từ hiện trường.

Trong trường hợp này, Apache Kafka có thể được sử dụng làm lớp trừu tượng giữa các cảm biến và ứng dụng sử dụng dữ liệu của chúng.

Kafka được sử dụng như một lớp trừu tượng giữa các nguồn và dịch vụ. Hình ảnh của tác giả.

Với loại cơ sở hạ tầng này, có thể xây dựng tất cả các loại (cái gọi là) hệ thống hướng sự kiện thời gian thực, giống như một chương trình phát hiện và cảnh báo ùn tắc giao thông khi số lượng phương tiện tăng đột ngột cùng với tốc độ trung bình giảm.

Và đó là lúc Apache Spark phát huy tác dụng.

Nó có một mô-đun riêng để xử lý luồng được gọi là Truyền phát có cấu trúc Spark, có thể kết nối với Kafka và xử lý tin nhắn của nó.

Thiết lập môi trường

Tất cả những gì bạn cần là docker và docker-compose.

Chúng tôi sẽ sử dụng cấu hình tệp soạn thảo docker dựa trên các kho lưu trữ sau: liên kết tia lửa, liên kết kafka.

Sản phẩm ./src khối lượng là nơi chúng tôi sẽ đặt tập lệnh của mình.

Để khởi động môi trường, chỉ cần chạy

docker-soạn lên

Tất cả các mã có sẵn trong này Kho GitHub.

Một trong những điều tôi thích nhất khi bắt đầu nghiên cứu Spark là sự giống nhau giữa mã viết cho nó và các tập lệnh python+pandas thông thường của tôi. Việc di cư rất dễ dàng.

Theo cùng một logic, mô-đun phát trực tuyến của Spark rất giống với mã tia lửa thông thường, giúp dễ dàng di chuyển từ ứng dụng hàng loạt sang ứng dụng truyền phát.

Như đã nói, trong các phần sau, chúng ta sẽ tập trung vào việc tìm hiểu các đặc điểm cụ thể của tính năng phát trực tuyến có cấu trúc Spark, tức là nó có những tính năng mới nào.

Công việc đầu tiên của chúng tôi

Hãy bắt đầu chậm rãi và xây dựng một ví dụ về đồ chơi

Điều đầu tiên cần làm là tạo một chủ đề Kafka để từ đó công việc tia lửa của chúng ta sẽ tiếp nhận các thông điệp.

Điều này được thực hiện bởi truy cập vào bến container Kafka và thực hiện:

kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test_topic

Để mô phỏng nhà sản xuất viết tin nhắn về chủ đề này, hãy sử dụng kafka-console-nhà sản xuất. Ngoài ra bên trong container:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_topic --property "parse.key=true" --property "key.separator=:"

Từ bây giờ, mọi dòng gõ vào terminal sẽ được gửi dưới dạng tin nhắn đến chủ đề kiểm tra. Ký tự “:” được dùng để phân tách khóa và giá trị của tin nhắn (key:value).

Hãy tạo một công việc Spark để tiếp thu chủ đề này.

Mã cần phải được đặt bên trong /src/phát trực tuyến thư mục (không có gì đặc biệt, chỉ là thư mục tôi đã chọn).

Điều quan trọng cần lưu ý là chúng tôi đang sử dụng các thuộc tính đọc dòng ghi dòng, thay vì đọc và viết thông thường. Đây là khía cạnh chính khiến Spark coi công việc của chúng tôi như một ứng dụng phát trực tuyến.

Để kết nối với Kafka, cần chỉ định máy chủ và chủ đề. tùy chọn bắt đầuOffsets=“sớm nhất” yêu cầu Spark đọc chủ đề ngay từ đầu. Ngoài ra, vì Kafka lưu trữ thông điệp của nó trong nhị phân dạng, chúng cần được giải mã thành chuỗi.

Các lựa chọn khác sẽ được khám phá thêm.

Bây giờ, hãy truy cập vào vùng chứa Spark và chạy công việc.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/read_test_stream.py

Sau vài giây cấu hình, nó sẽ bắt đầu sử dụng chủ đề.

Những thông điệp tiêu tốn tia lửa từ Kafka. Hình ảnh của tác giả.

Spark Streaming hoạt động trong trộn vi mô chế độ và đó là lý do tại sao chúng tôi thấy thông tin "lô" khi nó sử dụng tin nhắn.

Phân lô vi mô nằm giữa tính năng phát trực tuyến hoàn toàn “thực sự”, trong đó tất cả các tin nhắn được xử lý riêng lẻ khi chúng đến và lô thông thường, trong đó dữ liệu vẫn ở trạng thái tĩnh và được sử dụng theo yêu cầu. Spark sẽ đợi một thời gian để cố gắng tích lũy tin nhắn để xử lý chúng cùng nhau, giảm chi phí hoạt động và tăng độ trễ. Điều này có thể được điều chỉnh theo nhu cầu của bạn.

Tôi không phải là người đánh máy siêu nhanh, vì vậy Spark xử lý tin nhắn trước khi tôi có thể đưa tin nhắn mới vào lô hiện tại.

Và đó là công việc phát trực tuyến đầu tiên của chúng tôi!

Tôi hy vọng bạn hiểu được: không khó để viết mã một công việc xử lý luồng, nhưng có một số vấn đề.

Ghi dữ liệu vào luồng Kafka

Bây giờ là lúc bắt đầu thử nghiệm với dữ liệu cảm biến.

Bạn có thể tải về zip tệp từ tháng 2022 năm XNUMX và giải nén nó vào /dữ liệu âm lượng. Dữ liệu ban đầu ở dạng JSON và chiếm khoảng 23Gb dung lượng. Điều đầu tiên cần làm là chuyển nó sang dạng parquet để tối ưu hóa dung lượng ổ đĩa và thời gian đọc.

Các công việc tia lửa để thực hiện việc này được trình bày chi tiết trong kho GitHub, tất cả những gì bạn cần làm là thực thi chúng:

gửi tia lửa /src/transform_json_to_parquet.pyspark-submit /src/join_parquet_files.py

Tùy thuộc vào máy của bạn, quá trình thực thi có thể mất một chút thời gian. Nhưng nó trả tiền, kích thước tệp sàn gỗ cuối cùng là ~ 1Gb (nhỏ hơn 20 lần) và đọc nhanh hơn nhiều.

Chúng tôi cũng cần tạo chủ đề Kafka để nhận tin nhắn của mình:

kafka-topics.sh --create --replication-factor 1 --bootstrap-server localhost:9092 --topic Traffic_sensor

Theo tùy chọn, nếu bạn muốn hiển thị các tin nhắn đến, bạn có thể thiết lập ứng dụng khách bảng điều khiển.

kafka-console-consumer.sh --topic Traffic_sensor --bootstrap-server localhost:9092

Viết dữ liệu về chủ đề Kafka rất dễ nhưng cần một số chi tiết.

Trong truyền phát có cấu trúc, hành vi mặc định là không cố gắng suy ra lược đồ dữ liệu (cột và loại của chúng), vì vậy chúng ta cần chuyển một lược đồ.

Thông báo Kafka chỉ là cặp chuỗi nhị phân khóa-giá trị, vì vậy chúng tôi cần thể hiện dữ liệu của mình ở định dạng này. Điều này có thể dễ dàng đạt được bằng cách chuyển đổi tất cả các hàng thành chuỗi JSON, mã hóa chúng ở dạng nhị phân và lưu trữ kết quả trong cột “giá trị”.

Chuyển đổi các cột thành chuỗi JSON. Hình ảnh của tác giả.

Khóa tin nhắn rất quan trọng trong Kafka, nhưng chúng sẽ không hữu ích trong các thử nghiệm của chúng tôi, vì vậy tất cả các tin nhắn sẽ giống nhau.

Như đã đề cập trước đó, tập dữ liệu này LỚN nên tôi đã giới hạn số lượng tin nhắn được chèn ở mức 500,000.

Cuối cùng, chúng tôi chuyển máy chủ và chủ đề Kafka và một “trạm kiểm soátVị trí” nơi tia lửa sẽ lưu trữ tiến trình thực thi, hữu ích để khôi phục sau lỗi.

Thực hiện công việc:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/insert_traffic_topic.py
Chèn dữ liệu vào Kafka. Hình ảnh của tác giả.

Ở bên trái, công việc Spark đọc tệp, ở bên phải, một kafka-console-người tiêu dùng hiển thị các tin nhắn đến.

Chủ đề lưu lượng truy cập của chúng tôi đã được điền và gần như sẵn sàng để được xử lý.

Điều quan trọng cần nhớ là chúng tôi đã sử dụng công việc tia lửa để đưa vào chủ đề của mình chỉ nhằm mục đích học tập. Trong tình huống thực tế, chính các cảm biến sẽ gửi kết quả đọc trực tiếp đến Kafka.

Để mô phỏng hành vi động này, tập lệnh bên dưới ghi 1 hàng vào chủ đề cứ sau 2.5 giây.

Chế độ đầu ra - Đếm số lượng xe theo loại

Tiếp tục, hãy tạo một công việc để đếm số lượng phương tiện theo loại.

Cột “Classificação” (Phân loại) chứa loại xe được phát hiện.

Khi đọc chủ đề này, chúng ta cần chuyển đổi chuỗi nhị phân JSON trở lại định dạng cột.

Khi việc này hoàn tất, truy vấn có thể được xây dựng như bình thường. Thật thú vị khi lưu ý rằng trái tim truy vấn chỉ là chọn().nhómBy().tính(), tất cả phần còn lại đều liên quan đến logic phát trực tuyến.

Vì vậy đã đến lúc phải giải quyết Chế độ đầu ra() lựa chọn.

Chế độ đầu ra của ứng dụng luồng chỉ định cách chúng tôi muốn tính toán (lại) và ghi kết quả khi có dữ liệu mới.

Nó có thể giả định ba giá trị khác nhau:

  • Nối: Chỉ thêm bản ghi mới vào đầu ra.
  • Hoàn thành: Tính lại kết quả đầy đủ cho mỗi bản ghi mới.
  • Cập nhật: Cập nhật các bản ghi đã thay đổi.

Các chế độ này có thể có hoặc không có ý nghĩa tùy thuộc vào ứng dụng được viết. Ví dụ: chế độ “hoàn thành” có thể không có ý nghĩa nếu bất kỳ việc nhóm hoặc sắp xếp nào được thực hiện.

Hãy thực hiện công việc ở chế độ “hoàn thành” và xem kết quả.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/group_by_vehicle_type.py
Caminhão — Xe tải, Ô tô tự động, Không xác định-Không xác định, Ônibus-Xe buýt, Moto-Motocycle. Hình ảnh của tác giả.

Khi các bản ghi mới được chèn vào luồng (xem thiết bị đầu cuối ở bên phải), công việc sẽ tính toán lại kết quả đầy đủ. Điều này có thể hữu ích trong các tình huống mà thứ tự hàng là quan trọng, chẳng hạn như xếp hạng hoặc cạnh tranh.

Tuy nhiên, cách tiếp cận này có thể không tối ưu nếu số lượng nhóm quá lớn hoặc những thay đổi riêng lẻ không ảnh hưởng đến kết quả chung.

Vì vậy, một tùy chọn khác là sử dụng chế độ đầu ra “cập nhật”, chế độ này chỉ tạo thông báo mới cho các nhóm đã thay đổi. Xem bên dưới:

Truy vấn với chế độ đầu ra là “cập nhật”. Hình ảnh của tác giả.

Chế độ “chắp thêm” không khả dụng cho các truy vấn có nhóm, vì vậy tôi sẽ không thể hiển thị bằng cách sử dụng cùng một công việc. Nhưng tôi nghĩ đó là chế độ đơn giản nhất, nó luôn luôn thêm một bản ghi mới vào đầu ra.

Các chế độ đầu ra này sẽ dễ hiểu hơn nếu bạn nghĩ đến việc lưu kết quả vào bảng. Ở chế độ đầu ra hoàn chỉnh, bảng sẽ được viết lại cho mỗi tin nhắn mới được xử lý, ở chế độ cập nhật, chỉ những dòng xảy ra một số cập nhật và phần bổ sung sẽ luôn thêm một dòng mới vào cuối.

Cửa sổ thời gian sụt giảm - Tổng hợp bằng cách sử dụng các khoảng thời gian

Trong các hệ thống phát trực tuyến, các tin nhắn có hai dấu thời gian khác nhau liên quan đến chúng: Thời gian sự kiện - Thời điểm tin nhắn được tạo, trong trường hợp của chúng tôi là thời gian đọc của cảm biến và Thời gian xử lý - Khi tin nhắn được đọc bởi tác nhân xử lý, trong trường hợp của chúng tôi là khi nó chạm tới Spark.

Một tính năng quan trọng của công cụ xử lý luồng là khả năng xử lý thời gian sự kiện. Cửa sổ lộn xộn là các khoảng thời gian cố định không chồng chéo được sử dụng để tổng hợp bằng cách sử dụng các cột thời gian sự kiện. Nói một cách đơn giản hơn, họ chia dòng thời gian thành các lát có kích thước bằng nhau để mỗi sự kiện thuộc về một khoảng thời gian duy nhất.

Ví dụ: đếm, cứ sau 5 phút, có bao nhiêu phương tiện được phát hiện trong 5 phút qua.

Cửa sổ lật 5 phút. Hình ảnh của tác giả.

Đoạn mã dưới đây minh họa điều này:

Kiểu xử lý này có thể cực kỳ hữu ích trong nhiều tình huống. Quay trở lại với công cụ phát hiện ùn tắc giao thông được đề xuất trước đó, một phương pháp khả thi là đo tốc độ trung bình của các phương tiện trong khoảng thời gian 10 phút và xem liệu tốc độ đó có thấp hơn một ngưỡng nhất định hay không.

Xử lý thời gian sự kiện là một chủ đề phức tạp. Mọi thứ đều có thể xảy ra khi xử lý nó, chẳng hạn như tin nhắn bị thất lạc, đến quá muộn hoặc không theo thứ tự. Spark có một số cơ chế để cố gắng giảm thiểu các vấn đề, như watermarks, mà chúng ta sẽ không tập trung vào.

Cửa sổ thời gian cũng có thể được sử dụng kết hợp với các cột khác trong nhómBy(). Ví dụ bên dưới đếm số lượng phương tiện theo loại trong khoảng thời gian 5 phút.

Cửa sổ thời gian trượt - Linh hoạt về các khoảng thời gian

Cửa sổ thời gian trượt là sự linh hoạt của cửa sổ trượt. Thay vì tạo các khoảng không chồng chéo, chúng cho phép xác định tần suất mỗi khoảng sẽ được tạo.

Ví dụ: cứ sau 5 phút, hãy đếm xem có bao nhiêu phương tiện được phát hiện trong 30 phút vừa qua.

Do đó, các sự kiện có thể thuộc nhiều khoảng và được tính bao nhiêu lần tùy ý.

Để xác định một cửa sổ trượt, chỉ cần chuyển khoảng thời gian cập nhật tới cửa sổ() chức năng.

Hãy xem kết quả đầu ra.

Như chúng ta có thể thấy, chúng ta có các cửa sổ 30 phút được tạo cứ sau 5 phút.

Tính linh hoạt này có thể khá hữu ích để xác định các quy tắc kinh doanh cụ thể hơn và các trình kích hoạt phức tạp hơn. Ví dụ: trình phát hiện kẹt xe của chúng tôi có thể gửi phản hồi cứ 5 giây một lần trong 10 phút qua và tạo cảnh báo khi tốc độ trung bình của ô tô giảm xuống dưới 20km/h.

Đây là cái nhìn nhanh về các khái niệm chính của Spark Structured Streaming và cách chúng có thể được áp dụng với Kafka.

Apache Kafka và Apache Spark đều là những công cụ mạnh mẽ và đáng tin cậy được nhiều công ty sử dụng để xử lý lượng dữ liệu đáng kinh ngạc hàng ngày, khiến chúng trở thành một trong những cặp mạnh nhất trong nhiệm vụ xử lý luồng.

Chúng tôi đã học cách điền, sử dụng và xử lý các chủ đề Kafka bằng cách sử dụng các công việc Spark. Đây không phải là một nhiệm vụ khó khăn, như đã đề cập trong bài đăng, API xử lý luồng gần như tương đương với API hàng loạt thông thường, chỉ với một số điều chỉnh nhỏ.

Chúng tôi cũng đã thảo luận về các chế độ đầu ra khác nhau, một số điều cụ thể dành cho các ứng dụng phát trực tuyến và cách sử dụng từng chế độ đó. Cuối cùng nhưng không kém phần quan trọng, chúng tôi đã khám phá các tập hợp với cửa sổ thời gian, một trong những khả năng chính của xử lý luồng.

Một lần nữa, đây chỉ là một cái nhìn nhanh và tôi sẽ để lại một số tài liệu tham khảo bên dưới nếu bạn muốn khám phá sâu hơn.

Hy vọng tôi đã giúp được phần nào đó, cảm ơn bạn đã đọc! 🙂

Tất cả các mã có sẵn trong này Kho GitHub.
Dữ liệu được sử dụng -
Contagens Khối lượng Radares, mở dữ liệu, Chính phủ Brazil

[1] Tính năng Tìm hiểu sâu: Hình mờ trong Truyền phát có cấu trúc Apache Spark — Max Fisher trên blog Databricks
[2] Chambers, B., & Zaharia, M. (2018). Spark: Hướng dẫn dứt khoát: Việc xử lý dữ liệu lớn trở nên đơn giản. “O'Reilly Media, Inc.”.
[3] Hậu cần, vận chuyển và vận chuyển theo thời gian thực với Apache Kafka— Kai Waehner
[4] Có sự góp mặt của Apache Kafka trong Netflix Studio và Finance World - Blog hợp lưu
[5] Spark Streaming & Kafka — https://sparkbyexamples.com/

Xem nhanh về Truyền phát có cấu trúc Spark + Kafka được xuất bản lại từ nguồn https://towardsdatascience.com/a-fast-look-at-spark-structured-streaming-kafka-f0ff64107325?source=rss—-7f60cf5620c9—4 qua https:/ /towardsdatascience.com/feed

<!–

->

Dấu thời gian:

Thêm từ Tư vấn chuỗi khối