Spark Structured Streaming + Kafka 살펴보기

스트림 처리 작업에 이 강력한 듀오를 사용하는 방법에 대한 기본 사항 학습

님이 촬영 한 사진 니키타 싱할 on Unsplash

최근 저는 데이터 엔지니어링 분야의 두 가지 주요 기술인 Apache Kafka와 Apache Spark에 대해 많은 공부를 시작했습니다.

나는 지난 몇 달 동안 그것들을 사용하여 여러 프로젝트를 만들었습니다. “Kafka, Debezium 및 BentoML을 사용한 머신러닝 스트리밍"가 예시이다. 저의 초점은 이러한 현대적이고 유명한 도구를 사용하여 강력한 데이터 파이프라인을 만드는 방법을 배우고 그 장점과 단점을 파악하는 것입니다.

지난 몇 달 동안 저는 이미 두 도구를 모두 사용하여 ETL 파이프라인을 생성하는 방법을 다루었지만 결코 함께 사용하지는 않았습니다. 오늘은 그 공백을 메울 것입니다.

우리의 목표는 Spark+Kafka를 사용하여 스트리밍 애플리케이션을 구축하는 일반적인 아이디어를 배우고 실제 데이터를 사용하여 주요 개념을 빠르게 살펴보는 것입니다.

아이디어는 간단합니다. Apache Kafka는 생산자가 대기열( 화제) 다른 쪽에서는 소비자가 읽을 수 있습니다.

그러나 이는 모든 종류의 전달 보장(정확히 한 번, 한 번, 임의), 메시지 저장 및 메시지 복제 기능을 갖춘 동시에 유연성, 확장성 및 높은 처리량을 허용하는 탄력적인 분산 메시징 서비스로 구축된 매우 복잡한 도구입니다. 마이크로서비스 통신, 실시간 이벤트 시스템, 스트리밍 ETL 파이프라인과 같은 더 광범위한 사용 사례 세트가 있습니다.

Apache Spark는 분산 메모리 기반 데이터 변환 엔진입니다.

또한 모든 종류의 데이터베이스, 파일 시스템 및 클라우드 인프라와 연결할 수 있는 매우 복잡한 도구입니다. 분산 환경에서 작동하여 기계 간 처리를 병렬화하고 게으른 평가 철학과 쿼리 최적화를 사용하여 고성능 변환을 달성하도록 설계되었습니다.

멋진 점은 결국 코드가 일반적인 SQL 쿼리이거나 (거의) Python+pandas 스크립트가 되며 모든 마법이 사용자 친화적인 고급 API로 추상화된다는 것입니다.

이 두 가지 기술을 결합하면 스트리밍 ETL 파이프라인을 구축하는 데 완벽한 조합을 얻을 수 있습니다.

우리는 미나스 제라이스(브라질)의 수도인 벨루오리존치(BH) 시의 교통 센서에서 얻은 데이터를 사용할 것입니다. 이는 도시 내 여러 장소의 교통 흐름 측정값을 포함하는 거대한 데이터세트입니다. 각 센서는 해당 위치에서 운전하는 차량 유형(자동차, 오토바이, 버스/트럭), 속도 및 길이(및 우리가 사용하지 않을 기타 정보)를 주기적으로 감지합니다.

이 데이터 세트는 현장에서 지속적으로 판독값을 보내는 센서 그룹인 스트리밍 시스템의 고전적인 애플리케이션 중 하나를 정확하게 나타냅니다.

이 시나리오에서는 Apache Kafka를 센서와 해당 데이터를 소비하는 애플리케이션 간의 추상화 계층으로 사용할 수 있습니다.

Kafka는 소스와 서비스 간의 추상화 계층으로 사용됩니다. 작성자의 이미지.

이러한 종류의 인프라를 사용하면 소위 말하는 모든 종류의 구축이 가능합니다. 실시간 이벤트 중심 시스템, 평균 속도가 감소하고 차량 수가 갑자기 증가할 때 교통 정체를 감지하고 경고하는 프로그램과 같습니다.

이것이 바로 Apache Spark가 작동하는 곳입니다.

여기에는 스트림 처리를 위한 기본 모듈이 있습니다. 스파크 구조적 스트리밍, Kafka에 연결하여 메시지를 처리할 수 있습니다.

환경 설정

필요한 것은 docker와 docker-compose뿐입니다.

다음 리포지토리를 기반으로 docker-compose 파일 구성을 사용합니다. 링크 스파크, 링크 카프카.

XNUMXD덴탈의 ./src 볼륨은 스크립트를 넣을 곳입니다.

환경을 시작하려면 다음을 실행하세요.

도커 - 작성

모든 코드는 여기에서 사용할 수 있습니다. GitHub 저장소.

Spark를 공부하기 시작했을 때 가장 마음에 들었던 점 중 하나는 Spark에 대해 작성된 코드와 일반적인 python+pandas 스크립트 간의 유사성이었습니다. 마이그레이션이 매우 쉬웠습니다.

동일한 논리에 따라 Spark의 스트리밍 모듈은 일반적인 Spark 코드와 매우 유사하므로 배치 애플리케이션에서 스트림 애플리케이션으로 쉽게 마이그레이션할 수 있습니다.

이에 따라 다음 섹션에서는 Spark 구조적 스트리밍의 특수성, 즉 어떤 새로운 기능이 있는지 학습하는 데 중점을 둘 것입니다.

우리의 첫 번째 직업

천천히 시작해서 장난감 예제를 만들어 봅시다

가장 먼저 해야 할 일은 Spark 작업이 메시지를 사용할 Kafka 주제를 만드는 것입니다.

이것은에 의해 수행 카프카 컨테이너 터미널 접속 그리고 실행:

kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test_topic

이 주제에 대해 생산자가 메시지를 작성하는 것을 시뮬레이션하기 위해 카프카 콘솔 생산자. 또한 컨테이너 내부에는 다음이 포함됩니다.

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_topic --property "parse.key=true" --property "key.separator=:"

이제부터 터미널에 입력된 모든 줄은 테스트 주제에 메시지로 전송됩니다. 문자 ":"는 메시지의 키와 값(키:값)을 구분하는 데 사용됩니다.

이 주제를 사용하는 Spark 작업을 만들어 보겠습니다.

코드는 내부에 넣어야 합니다. /src/스트리밍 폴더(특별한 것은 없고 내가 선택한 폴더만 있음)

주목해야 할 중요한 점은 우리가 속성을 사용하고 있다는 것입니다. 읽기 스트림 쓰기 스트림, 일반적인 읽기 및 쓰기 대신. 이것이 Spark가 우리 작업을 스트리밍 애플리케이션으로 취급하게 만드는 주요 측면입니다.

Kafka에 연결하려면 서버와 주제를 지정해야 합니다. 옵션 startOffsets="early”는 Spark에게 주제를 처음부터 읽으라고 지시합니다. 또한 Kafka는 메시지를 다음 위치에 저장하기 때문에 형식으로 디코딩해야 합니다. .

다른 옵션에 대해서는 더 자세히 살펴보겠습니다.

이제 Spark 컨테이너에 액세스하여 작업을 실행해 보겠습니다.

스파크 제출 --패키지 org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/read_test_stream.py

몇 초 동안 구성하면 주제가 사용되기 시작합니다.

Kafka의 Spark 소비 메시지. 작성자의 이미지.

스파크 스트리밍은 다음에서 작동합니다. 마이크로 배치 모드이며 이것이 바로 메시지를 사용할 때 "배치" 정보를 보는 이유입니다.

마이크로 배치는 모든 메시지가 도착할 때 개별적으로 처리되는 완전한 "진정한" 스트리밍과 데이터가 정적으로 유지되고 요청 시 소비되는 일반적인 배치 사이에 있습니다. Spark는 메시지를 함께 처리하기 위해 메시지를 축적하려고 잠시 기다리므로 오버헤드가 줄어들고 대기 시간이 늘어납니다. 이는 귀하의 필요에 맞게 조정될 수 있습니다.

나는 매우 빠른 타자기가 아니기 때문에 Spark는 현재 배치에 새 메시지를 포함하기 전에 메시지를 처리합니다.

그리고 그것은 우리의 첫 번째 스트리밍 작업이었습니다!

스트림 처리 작업을 코딩하는 것은 어렵지 않지만 몇 가지 문제가 있다는 느낌을 받으시기 바랍니다.

Kafka 스트림에 데이터 쓰기

이제 센서 데이터를 가지고 놀 차례입니다.

당신은 다운로드 할 수 있습니다 지퍼 2022년 XNUMX월의 파일을 다운로드하여 다음 위치에 추출합니다. /데이터 용량. 데이터는 원래 JSON 형식이며 약 23GB의 공간을 차지합니다. 가장 먼저 할 일은 디스크 공간과 읽기 시간을 최적화하기 위해 쪽모이 세공으로 변환하는 것입니다.

이를 수행하는 Spark 작업은 GitHub 저장소에 자세히 설명되어 있으므로 실행하기만 하면 됩니다.

스파크 제출 /src/transform_json_to_parquet.py스파크 제출 /src/join_parquet_files.py

컴퓨터에 따라 실행하는 데 다소 시간이 걸릴 수 있습니다. 그러나 최종 쪽모이 세공 파일 크기는 ~1Gb(20배 이상 작음)이며 읽기 속도가 훨씬 빠릅니다.

또한 메시지를 수신하려면 Kafka 주제를 생성해야 합니다.

kafka-topics.sh --create --replication-factor 1 --bootstrap-server localhost:9092 --topic 트래픽_센서

선택적으로 도착 메시지를 표시하려는 경우 콘솔 소비자를 설정할 수 있습니다.

kafka-console-consumer.sh --topic 트래픽_센서 --bootstrap-server localhost:9092

Kafka 주제에 대한 데이터를 작성하는 것은 쉽지만 몇 가지 세부 사항이 있습니다.

구조적 스트리밍에서 기본 동작은 데이터 스키마(열 및 해당 유형)를 추론하려고 시도하지 않는 것이므로 하나를 전달해야 합니다.

Kafka 메시지는 키-값 이진 문자열 쌍일 뿐이므로 데이터를 이 형식으로 표현해야 합니다. 이는 모든 행을 JSON 문자열로 변환하고, 이를 바이너리로 인코딩하고, 결과를 “값” 열에 저장함으로써 쉽게 달성할 수 있습니다.

열을 JSON 문자열로 변환합니다. 작성자의 이미지.

메시지 키는 Kafka에서 매우 중요하지만 테스트에서는 유용하지 않으므로 모든 메시지는 동일합니다.

앞서 언급했듯이 이 데이터 세트는 엄청나기 때문에 삽입되는 메시지 수를 500,000개로 제한했습니다.

마지막으로 Kafka 서버와 주제 및 "검문소위치” 스파크는 오류 복구에 유용한 실행 진행 상황을 저장합니다.

작업 실행:

스파크 제출 --패키지 org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/insert_traffic_topic.py
Kafka에 데이터 삽입 작성자의 이미지.

왼쪽에서는 Spark 작업이 파일을 읽고, 오른쪽에서는 카프카 콘솔 소비자 도착 메시지를 표시합니다.

트래픽 주제가 입력되었으며 처리할 준비가 거의 완료되었습니다.

학습 목적으로만 주제를 채우기 위해 Spark 작업을 사용했다는 점을 기억하는 것이 중요합니다. 실제 시나리오에서는 센서 자체가 판독값을 Kafka로 직접 보냅니다.

이러한 동적 동작을 시뮬레이션하기 위해 아래 스크립트는 1초마다 주제에 2.5개의 행을 씁니다.

출력 모드 — 유형별 차량 수 계산

계속해서 유형별로 차량 수를 계산하는 작업을 만들어 보겠습니다.

"Classificação"(분류) 열에는 감지된 차량 유형이 포함됩니다.

주제를 읽으면서 JSON 바이너리 문자열을 다시 열 형식으로 변환해야 합니다.

이 작업이 완료되면 평소와 같이 쿼리를 작성할 수 있습니다. 쿼리 하트가 단지 고르다().그룹별().계산() 시퀀스, 나머지는 모두 스트리밍 로직과 관련이 있습니다.

이제 문제를 해결해야 할 때입니다. 출력 모드() 옵션.

스트림 애플리케이션의 출력 모드는 새 데이터가 도착할 때 결과를 (재)계산하고 쓰는 방법을 지정합니다.

세 가지 다른 값을 가정할 수 있습니다.

  • 추가: 출력에 새 레코드만 추가합니다.
  • 완료: 각각의 새 레코드에 대한 전체 결과를 다시 계산합니다.
  • 업데이트: 변경된 기록을 업데이트합니다.

이러한 모드는 작성된 애플리케이션에 따라 의미가 있을 수도 있고 없을 수도 있습니다. 예를 들어, 그룹화 또는 정렬이 수행되면 "완전" 모드가 의미가 없을 수 있습니다.

"완료" 모드에서 작업을 실행하고 결과를 살펴보겠습니다.

스파크 제출 --패키지 org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/group_by_vehicle_type.py
Caminhão — 트럭, Automóvel-Car, Indefinido-Undefine, Ônibus-Bus, Moto-Motocycle. 작성자의 이미지.

새 레코드가 스트림에 삽입되면(오른쪽 터미널 참조) 작업이 전체 결과를 다시 계산합니다. 이는 순위나 경쟁과 같이 행 순서가 중요한 상황에서 유용할 수 있습니다.

그러나 그룹 수가 너무 크거나 개별 변경 사항이 전체 결과에 영향을 주지 않는 경우에는 이 접근 방식이 최적이 아닐 수 있습니다.

따라서 또 다른 옵션은 변경된 그룹에 대해서만 새 메시지를 생성하는 "업데이트" 출력 모드를 사용하는 것입니다. 아래를 참조하세요:

출력 모드가 "업데이트"인 쿼리입니다. 작성자의 이미지.

그룹화가 포함된 쿼리에는 "추가" 모드를 사용할 수 없으므로 동일한 작업을 사용하여 표시할 수 없습니다. 하지만 가장 간단한 모드인 것 같아요. 항상 출력에 새 레코드를 추가합니다.

결과를 테이블에 저장하는 것을 고려하면 이러한 출력 모드를 더 쉽게 이해할 수 있습니다. 전체 출력 모드에서는 처리된 모든 새 메시지에 대해 테이블이 다시 작성되고, 업데이트 모드에서는 일부 업데이트가 발생한 행만 다시 작성되며 추가 시 항상 끝에 새 행이 추가됩니다.

텀블링 시간 창 — 시간 간격을 사용하여 집계

스트리밍 시스템에서 메시지에는 메시지와 관련된 두 가지 다른 타임스탬프가 있습니다. 이벤트 시간 — 메시지가 생성된 시간(우리의 경우 센서 판독 시간) 처리 시간 — 처리 에이전트가 메시지를 읽는 시간(우리의 경우 스파크에 도달합니다.

스트림 처리 도구의 중요한 기능은 이벤트 시간 처리를 처리하는 기능입니다. 연속 창은 이벤트 시간 열을 사용하여 집계를 만드는 데 사용되는 겹치지 않는 고정 시간 간격입니다. 더 간단히 말하면 타임라인을 동일한 크기의 조각으로 분할하여 각 이벤트가 단일 간격에 속하도록 합니다.

예를 들어, 지난 5분 동안 감지된 차량 수를 5분마다 계산합니다.

5분 텀블링 창. 작성자의 이미지.

아래 코드는 이를 보여줍니다.

이러한 종류의 처리는 많은 상황에서 매우 유용할 수 있습니다. 앞서 제안된 교통 정체 감지기로 돌아가서, 가능한 접근 방식 중 하나는 10분 동안 차량의 평균 속도를 측정하고 해당 속도가 특정 임계값보다 낮은지 확인하는 것입니다.

이벤트 시간 처리는 복잡한 주제입니다. 메시지가 손실되거나, 너무 늦게 도착하거나, 순서가 어긋나는 등 문제를 처리할 때 모든 일이 발생할 수 있습니다. Spark에는 다음과 같은 문제를 완화하기 위한 여러 가지 메커니즘이 있습니다. 워터 마크, 우리는 그것에 집중하지 않을 것입니다.

기간은 다음 열의 다른 열과 함께 사용할 수도 있습니다. 그룹바이(). 아래 예는 5분 동안 유형별 차량 수를 계산합니다.

슬라이딩 시간 창 — 시간 간격의 유연성

슬라이딩 시간 창은 텀블링 창을 유연하게 만든 것입니다. 겹치지 않는 간격을 만드는 대신 각 간격이 생성되는 빈도를 정의할 수 있습니다.

예를 들어 5분마다 지난 30분 동안 감지된 차량 수를 계산합니다.

따라서 이벤트는 여러 간격에 속할 수 있으며 필요한 만큼 횟수가 계산될 수 있습니다.

슬라이딩 윈도우를 정의하려면 업데이트 간격을 () 기능.

출력을 살펴보겠습니다.

보시다시피 30분마다 5분 창이 생성됩니다.

이러한 유연성은 보다 구체적인 비즈니스 규칙과 보다 복잡한 트리거를 정의하는 데 매우 유용할 수 있습니다. 예를 들어, 당사의 교통 정체 감지기는 지난 5분 동안 10초마다 응답을 보내고 평균 자동차 속도가 20km/h 미만으로 떨어지면 경고를 생성할 수 있습니다.

Spark Structured Streaming의 주요 개념과 이를 Kafka에 적용하는 방법을 빠르게 살펴보았습니다.

Apache Kafka와 Apache Spark는 많은 회사에서 매일 엄청난 양의 데이터를 처리하는 데 사용하는 안정적이고 강력한 도구로, 스트림 처리 작업에서 가장 강력한 쌍 중 하나입니다.

Spark 작업을 사용하여 Kafka 주제를 채우고, 사용하고, 처리하는 방법을 배웠습니다. 게시물에서 언급한 것처럼 이것은 어려운 작업이 아니었습니다. 스트림 처리 API는 약간의 조정만 제외하면 일반적인 배치 API와 거의 동일합니다.

또한 다양한 출력 모드, 스트림 애플리케이션에 특정한 내용 및 각 모드를 사용하는 방법에 대해서도 논의했습니다. 마지막으로 스트림 처리의 주요 기능 중 하나인 시간 창을 사용하여 집계를 살펴보았습니다.

다시 한 번 말씀드리지만, 이는 간단히 살펴보는 것이었습니다. 더 자세히 살펴보고 싶으시면 아래에 몇 가지 참고 자료를 남겨 두겠습니다.

조금이나마 도움이 되었기를 바라며 읽어주셔서 감사합니다! 🙂

모든 코드는 여기에서 사용할 수 있습니다. GitHub 저장소.
사용된 데이터 —
Contagens Volumétricas de Radares, 오픈 데이터, 브라질 주지사

[1] 기능 심층 분석: Apache Spark 구조적 스트리밍의 워터마킹 — Databricks 블로그의 Max Fisher
[2] 챔버스, B., & 자하리아, M. (2018). Spark: 최종 가이드: 빅 데이터 처리가 간편해졌습니다. “오라일리 미디어, Inc.”.
[3] Apache Kafka를 사용한 실시간 물류, 배송 및 운송— 카이 웨너
[4] Netflix 스튜디오 및 금융 세계에서 Apache Kafka 소개 — 컨플루언트 블로그
[5] 스파크 스트리밍 및 Kafka — https://sparkbyexamples.com/

소스 https://towardsdatascience.com/a-fast-look-at-spark-structured-streaming-kafka-f0ff64107325?source=rss—-7f60cf5620c9—4에서 https:/를 통해 다시 게시된 Spark 구조적 스트리밍 + Kafka에 대한 간략한 살펴보기 /towardsdatascience.com/feed

<!–

–>

타임 스탬프 :

더보기 블록 체인 컨설턴트