Spark 構造化ストリーミング + Kafka の概要

この強力なデュオをストリーム処理タスクに使用する方法の基本を学ぶ

による写真 ニキータ・シンハル on Unsplash

最近、データ エンジニアリングの世界をリードする 2 つのテクノロジーである Apache Kafka と Apache Spark についてよく勉強し始めました。

過去数か月間、これらを使用していくつかのプロジェクトを作成しました。 「Kafka、Debezium、BentoML を使用した機械学習ストリーミング」がその一例です。私の焦点は、これらの最新の有名なツールを使用して強力なデータ パイプラインを作成する方法を学び、その利点と欠点を理解することです。

ここ数か月間で、両方のツールを使用して ETL パイプラインを作成する方法については説明しましたが、同時に使用することはありませんでした。今日はそのギャップを埋めるつもりです。

私たちの目標は、Spark+Kafka を使用してストリーミング アプリケーションを構築する背後にある一般的なアイデアを学び、実際のデータを使用してその主要な概念をざっと見てみることです。

アイデアはシンプルです。Apache Kafka はメッセージ ストリーミング ツールであり、プロデューサーはキュー (キューと呼ばれる) の一方の端にメッセージを書き込みます。 トピック)もう一方の消費者が読むことができます。

しかし、これは非常に複雑なツールであり、柔軟性、スケーラビリティ、および高スループットを実現しながら、あらゆる種類の配信保証 (1 回、1 回、すべて)、メッセージ ストレージ、およびメッセージ レプリケーションを備えた復元力のある分散メッセージング サービスとして構築されています。マイクロサービス通信、リアルタイム イベント システム、ストリーミング ETL パイプラインなど、幅広いユースケースがあります。

Apache Spark は、分散メモリベースのデータ変換エンジンです。

これは非常に複雑なツールでもあり、あらゆる種類のデータベース、ファイル システム、クラウド インフラストラクチャに接続できます。分散環境で動作してマシン間の処理を並列化するように設計されており、遅延評価の哲学とクエリの最適化を使用して高パフォーマンスの変換を実現します。

これのすばらしい点は、一日の終わりには、コードが通常の SQL クエリ、または (ほぼ) Python + pandas スクリプトになり、すべての魔法が使いやすい高レベル API の下で抽象化されていることです。

これら 2 つのテクノロジーを組み合わせることで、ストリーミング ETL パイプラインを構築するのに最適な組み合わせが得られます。

ミナス ジェライス州 (ブラジル) の首都ベロオリゾンテ (BH) 市の交通センサーからのデータを使用します。これは、市内のいくつかの場所の交通流の測定値を含む巨大なデータセットです。各センサーは、その場所を走行している車両の種類 (乗用車、オートバイ、バス/トラック)、その速度と長さ (および使用しないその他の情報) を定期的に検出します。

このデータセットは、ストリーミング システムの古典的なアプリケーションの 1 つ、つまり現場から測定値を継続的に送信するセンサーのグループを正確に表しています。

このシナリオでは、Apache Kafka をセンサ​​ーとそのデータを使用するアプリケーションの間の抽象化レイヤーとして使用できます。

Kafka は、ソースとサービスの間の抽象化レイヤーとして使用されます。著者による画像。

この種のインフラストラクチャを使用すると、あらゆる種類の (いわゆる) ものを構築することができます。 リアルタイムのイベント駆動型システム、平均速度の低下に伴って車両の数が突然増加した場合に、交通渋滞を検出して警告するプログラムのようなものです。

そこで Apache Spark が活躍します。

というストリーム処理用のネイティブモジュールがあります。 Spark 構造化ストリーミング、Kafka に接続してそのメッセージを処理できます。

環境のセットアップ

必要なのは docker と docker-compose だけです。

次のリポジトリに基づいて docker-compose ファイル構成を使用します。 リンクスパーク, リンク・カフカ.

  ./src volume はスクリプトを配置する場所です。

環境を開始するには、実行するだけです

ドッカー - 構成

すべてのコードはこれで利用できます GitHubリポジトリ.

Spark の勉強を始めたときに私が最も気に入ったことの 1 つは、Spark 用に書かれたコードと、通常の Python + pandas スクリプトとの類似性でした。移行はとても簡単でした。

同じロジックに従って、Spark のストリーミング モジュールは通常の Spark コードに非常に似ているため、バッチ アプリケーションからストリーム アプリケーションへの移行が簡単になります。

そうは言っても、次のセクションでは、Spark 構造化ストリーミングの特殊性、つまり、どのような新機能があるかを学ぶことに焦点を当てます。

私たちの最初の仕事

ゆっくり始めておもちゃのサンプルを構築してみましょう

最初に行うことは、Spark ジョブがメッセージを消費する場所となる Kafka トピックを作成することです。

これは Kafka コンテナターミナルへのアクセス そして実行:

kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test_topic

このトピックに関するメッセージを作成するプロデューサーをシミュレートするには、 カフカコンソールプロデューサー。 コンテナ内にも次のようなものがあります。

kafka-console-Producer.sh --bootstrap-server localhost:9092 --topic test_topic --property "parse.key=true" --property "key.separator=:"

これからは、ターミナルに入力されたすべての行がメッセージとしてテスト トピックに送信されます。文字「:」は、メッセージのキーと値 (キー:値) を区切るために使用されます。

このトピックを使用する Spark ジョブを作成しましょう。

コードは内部に入れる必要があります /src/ストリーミング フォルダー (特別なことは何もありません。単に私が選択したフォルダーです)。

注意すべき重要な点は、属性を使用していることです。 読み取りストリーム & ライトストリーム、 通常の読み取りと書き込みの代わりに。これは、Spark がジョブをストリーミング アプリケーションとして扱う主な側面です。

Kafka に接続するには、サーバーとトピックを指定する必要があります。オプション 開始オフセット=”最も早い」は、Spark にトピックを最初から読むように指示します。また、Kafka はメッセージを次の場所に保存するため、 バイナリ 形式なので、デコードする必要があります 文字列.

他のオプションについてはさらに検討していきます。

次に、Spark コンテナーにアクセスしてジョブを実行しましょう。

smile-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/read_test_stream.py

構成の数秒後、トピックの使用が開始されます。

Kafka からのメッセージを消費する Spark。著者による画像。

Spark ストリーミングは以下で動作します マイクロバッチ処理 そのため、メッセージが消費されるときに「バッチ」情報が表示されます。

マイクロバッチ処理は、すべてのメッセージが到着時に個別に処理される完全な「真の」ストリーミングと、データが静的なままでオンデマンドで消費される通常のバッチとの中間に位置します。 Spark は、メッセージを蓄積してまとめて処理するためにしばらく待機するため、オーバーヘッドが削減され、レイテンシが増加します。これはニーズに合わせて調整できます。

私はそれほど高速に入力できるわけではないので、現在のバッチに新しいメッセージを含める前に、Spark がメッセージを処理します。

それが私たちの最初のストリーミングの仕事でした。

ストリーム処理ジョブをコーディングするのは難しくありませんが、いくつか注意点があることを感じていただければ幸いです。

Kafka ストリームへのデータの書き込み

次に、センサー データを操作してみましょう。

あなたがダウンロードすることができます ZIP 2022 年 XNUMX 月のファイルを抽出し、 /データ 音量。データは元々 JSON 形式であり、約 23 GB のスペースを必要とします。最初に行うことは、ディスク容量と読み取り時間を最適化するために、それを寄木細工に変換することです。

これを行うための Spark ジョブの詳細は GitHub リポジトリに記載されており、必要なのはそれらを実行することだけです。

スパークサブミット /src/transform_json_to_parquet.pyスパークサブミット /src/join_parquet_files.py

マシンによっては、実行に時間がかかる場合があります。しかし、その甲斐あって、最終的な寄木細工のファイル サイズは約 1Gb (20 分の XNUMX 以上小さくなります) になり、読み取りがはるかに速くなります。

メッセージを受信するために Kafka トピックを作成する必要もあります。

kafka-topics.sh --create --replication-factor 1 --bootstrap-server localhost:9092 --topic Traffic_sensor

オプションで、受信したメッセージを表示したい場合は、コンソール コンシューマを設定できます。

kafka-console-consumer.sh --topic Traffic_sensor --bootstrap-server localhost:9092

Kafka トピックにデータを書き込むのは簡単ですが、詳細がいくつかあります。

構造化ストリーミングでは、デフォルトの動作ではデータ スキーマ (列とその型) を推論しようとしないため、データ スキーマを渡す必要があります。

Kafka メッセージは単なるキーと値のバイナリ文字列ペアであるため、データをこの形式で表す必要があります。これは、すべての行を JSON 文字列に変換し、バイナリでエンコードし、結果を「値」列に保存することで簡単に実現できます。

列を JSON 文字列に変換します。著者による画像。

メッセージ キーは Kafka では非常に重要ですが、テストでは役に立たないため、すべてのメッセージに同じキーが設定されます。

前に述べたように、このデータセットは巨大であるため、挿入するメッセージの数を 500,000 に制限しました。

最後に、Kafka サーバーとトピック、そして「チェックポイントの場所」 ここに、Spark は実行の進行状況を保存し、エラーから回復するのに役立ちます。

ジョブの実行:

smile-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/insert_traffic_topic.py
Kafka にデータを挿入します。著者による画像。

左側では Spark ジョブがファイルを読み取り、右側ではファイルを読み取ります。 kafka-コンソール-コンシューマー 到着したメッセージを表示します。

トラフィック トピックにはデータが入力され、ほぼ処理の準備が整いました。

学習目的でトピックを設定するために Spark ジョブを使用したことを覚えておくことが重要です。実際のシナリオでは、センサー自体が測定値を Kafka に直接送信します。

この動的な動作をシミュレートするために、以下のスクリプトは 1 秒ごとにトピックに 2.5 行を書き込みます。

出力モード — タイプ別の車両数のカウント

次に、タイプごとに車両の数を数えるジョブを作成しましょう。

「Classificação」(分類) 列には、検出された車両タイプが含まれます。

トピックを読んでいると、JSON バイナリ文字列を列形式に変換し直す必要があります。

これが完了すると、通常どおりクエリを構築できます。興味深いのは、クエリのハートがまさに select()。グループ化()。カウント() シーケンス、残りはすべてストリーミング ロジックに関連します。

そこで、次のことに対処する時期が来ました 出力モード() オプション。

ストリーム アプリケーションの出力モードは、新しいデータが到着したときに結果を (再) 計算して書き込む方法を指定します。

次の 3 つの異なる値を取ることができます。

  • 追加: 新しいレコードのみを出力に追加します。
  • 完全: 新しいレコードごとに完全な結果を再計算します。
  • アップデイト: 変更されたレコードを更新します。

これらのモードは、作成されたアプリケーションに応じて意味をなす場合もあれば、意味をなさない場合もあります。たとえば、グループ化または並べ替えが実行される場合、「完全」モードは意味をなさない場合があります。

「完了」モードでジョブを実行し、結果を見てみましょう。

smile-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/group_by_vehicle_type.py
Caminhão — トラック、自動車-自動車、Indefinido-未定義、Ônibus-バス、Moto-オートバイ。著者による画像。

新しいレコードがストリームに挿入されると (右側の端子を参照)、ジョブは完全な結果を再計算します。これは、ランキングや競争など、行の順序が重要な状況で役立ちます。

ただし、グループの数が多すぎる場合、または個々の変更が全体の結果に影響を与えない場合、このアプローチは最適ではない可能性があります。

したがって、別のオプションは、変更されたグループに対してのみ新しいメッセージを生成する「更新」出力モードを使用することです。以下を参照してください:

出力モード「update」のクエリ。著者による画像。

「追加」モードはグループ化されたクエリでは使用できないため、同じジョブを使用して表示することはできません。でもそれが一番シンプルなモードだと思うんですが、 常に 新しいレコードを出力に追加します。

これらの出力モードは、結果をテーブルに保存することを考えると理解しやすくなります。完全出力モードでは、処理される新しいメッセージごとにテーブルが書き換えられます。更新モードでは、更新が発生した行のみが書き換えられ、追加では常に最後に新しい行が追加されます。

タンブリング時間ウィンドウ — 時間間隔を使用した集計

ストリーミング システムでは、メッセージには、メッセージに関連する 2 つの異なるタイムスタンプがあります。イベント時間 — メッセージが作成された時間 (この場合はセンサーの読み取り時間)、処理時間 — メッセージが処理エージェントによって読み取られたとき (この場合は)スパークに届きます。

ストリーム処理ツールの重要な機能は、イベント時の処理を処理できることです。タンブリング ウィンドウは、イベント時間列を使用して集計を行うために使用される、重複しない固定時間間隔です。より簡単に言うと、タイムラインを同じサイズのスライスにスライスして、各イベントが単一の間隔に属するようにします。

たとえば、過去 5 分間に検出された車両の数を 5 分ごとに数えます。

5分間のタンブリングウィンドウ。著者による画像。

以下のコードはこれを示しています。

この種の処理は、多くの状況で非常に役立ちます。先に提案された交通渋滞検出器に戻ると、考えられるアプローチの 10 つは、XNUMX 分間の車両の平均速度を測定し、それが特定のしきい値を下回っているかどうかを確認することです。

イベント時の処理は複雑なトピックです。メッセージの紛失、到着の遅れ、順序の乱れなど、対処中にあらゆることが起こる可能性があります。 Spark には、問題を軽減するためのいくつかのメカニズムがあります。 透かし、焦点を当てません。

時間ウィンドウは、 groupBy()。以下の例では、5 分間のウィンドウでタイプごとに車両の数をカウントします。

スライディング時間枠 — 時間間隔の柔軟性

スライディング タイム ウィンドウは、タンブリング ウィンドウを柔軟化したものです。重複しない間隔を作成する代わりに、各間隔を作成する頻度を定義できます。

たとえば、5 分ごとに、過去 30 分間に検出された車両の数を数えます。

そのため、イベントは多くの間隔に属し、必要に応じて何度でもカウントできます。

スライディング ウィンドウを定義するには、更新間隔を ウィンドウを使用して入力ファイルを追加します。() 関数。

出力を見てみましょう。

ご覧のとおり、30 分ごとに 5 分のウィンドウが作成されています。

この柔軟性は、より具体的なビジネス ルールやより複雑なトリガーを定義する場合に非常に役立ちます。たとえば、当社の渋滞検知器は、過去 5 分間について 10 秒ごとに応答を送信し、車の平均速度が 20km/h を下回ったときにアラートを作成できます。

ここでは、Spark Structured Streaming の主な概念と、それらを Kafka にどのように適用できるかを簡単に説明しました。

Apache Kafka と Apache Spark はどちらも信頼性が高く堅牢なツールであり、多くの企業が毎日膨大な量のデータを処理するために使用しており、ストリーム処理タスクにおいて最も強力なペアの 1 つとなっています。

Spark ジョブを使用して Kafka トピックを設定、消費、処理する方法を学習しました。これは難しい作業ではありません。投稿で述べたように、ストリーム処理 API は通常のバッチ API とほぼ同じであり、わずかな調整が加えられているだけです。

また、ストリーム アプリケーションに特有のさまざまな出力モードと、それぞれの使用方法についても説明しました。最後になりましたが、ストリーム処理の主な機能の 1 つである時間枠を使用した集計について説明しました。

繰り返しますが、これはざっと見ただけです。さらに詳しく調べたい場合は、以下に参考文献をいくつか残しておきます。

何らかの形で役に立てば幸いです、読んでいただきありがとうございます! 🙂

すべてのコードはこれで利用できます GitHubリポジトリ.
使用されるデータ —
ヴォルメトリカス・デ・ラダレス感染症, オープンデータ、ブラジル政府

【1] 機能の詳細: Apache Spark 構造化ストリーミングのウォーターマーク — Databricks ブログの Max Fisher
[2] Chambers, B.、Zaharia, M. (2018)。 Spark: 決定版ガイド: ビッグデータ処理をシンプルに。 「オライリー・メディア株式会社」。
【3] Apache Kafka を使用したリアルタイムの物流、配送、輸送— カイ・ワーナー
【4] Netflix スタジオと金融の世界で Apache Kafka をフィーチャー — Confluent ブログ
[5] Spark ストリーミングと Kafka — https://sparkbyexamples.com/

Spark Structured Streaming + Kafka の概要 ソース https://towardsdatascience.com/a-fast-look-at-spark-structed-streaming-kafka-f0ff64107325?source=rss—-7f60cf5620c9—4 (https:/ 経由) から再公開/towardsdatascience.com/feed

<!–

–>

タイムスタンプ:

より多くの ブロックチェーンコンサルタント