Uma rápida olhada no Spark Structured Streaming + Kafka

Aprendendo o básico sobre como usar esta dupla poderosa para tarefas de processamento de fluxo

Foto por Nikhita Singhal on Unsplash

Recentemente comecei a estudar bastante sobre Apache Kafka e Apache Spark, duas tecnologias líderes no mundo da engenharia de dados.

Fiz vários projetos utilizando-os nos últimos meses; “Streaming de aprendizado de máquina com Kafka, Debezium e BentoML”é um exemplo. Meu foco é aprender como criar pipelines de dados poderosos com essas ferramentas modernas e famosas e ter uma noção de suas vantagens e desvantagens.

Nos últimos meses, já falei sobre como criar pipelines de ETL usando as duas ferramentas, mas nunca juntas, e essa é a lacuna que irei preencher hoje.

Nosso objetivo é aprender a ideia geral por trás da construção de um aplicativo de streaming com Spark+Kafka e dar uma visão rápida de seus principais conceitos usando dados reais.

A ideia é simples — Apache Kafka é uma ferramenta de streaming de mensagens, onde os produtores escrevem mensagens em uma extremidade de uma fila (chamada de tópico) para serem lidos pelos consumidores, por outro.

Mas é uma ferramenta muito complexa, construída para ser um serviço de mensagens distribuídas resiliente, com todos os tipos de garantias de entrega (exatamente uma vez, uma vez, qualquer), armazenamento de mensagens e replicação de mensagens, ao mesmo tempo que permite flexibilidade, escalabilidade e alto rendimento. Possui um conjunto mais amplo de casos de uso, como comunicação de microsserviços, sistemas de eventos em tempo real e pipelines de ETL de streaming.

Apache Spark é um mecanismo de transformação de dados baseado em memória distribuída.

É também uma ferramenta muito complexa, capaz de se conectar a todos os tipos de bancos de dados, sistemas de arquivos e infraestrutura em nuvem. É voltado para operar em ambientes distribuídos para paralelizar o processamento entre máquinas, alcançando transformações de alto desempenho por meio de sua filosofia de avaliação lenta e otimizações de consultas.

A parte legal disso é que, no final do dia, o código é apenas sua consulta SQL usual ou (quase) seu script Python + pandas, com toda a bruxaria abstraída em uma API de alto nível amigável e agradável.

Junte essas duas tecnologias e teremos a combinação perfeita para construir um pipeline de ETL de streaming.

Utilizaremos os dados dos sensores de trânsito da cidade de Belo Horizonte (BH), capital de Minas Gerais (Brasil). É um enorme conjunto de dados que contém medições do fluxo de tráfego em vários locais da cidade. Cada sensor detecta periodicamente o tipo de veículo que circula naquele local (carro, moto, ônibus/caminhão), sua velocidade e comprimento (e outras informações que não vamos utilizar).

Este conjunto de dados representa precisamente uma das aplicações clássicas para sistemas de streaming – um grupo de sensores que enviam suas leituras continuamente a partir do campo.

Neste cenário, o Apache Kafka pode ser utilizado como uma camada de abstração entre os sensores e as aplicações que consomem seus dados.

Kafka usado como camada de abstração entre fontes e serviços. Imagem do autor.

Com este tipo de infraestrutura é possível construir todo tipo de (os chamados) sistemas orientados a eventos em tempo real, como um programa para detectar e alertar sobre engarrafamentos quando o número de veículos aumenta repentinamente com uma queda na velocidade média.

E é aí que o Apache Spark entra em ação.

Possui um módulo nativo para processamento de stream chamado Streaming Estruturado Spark, que pode se conectar ao Kafka e processar suas mensagens.

Configurando o ambiente

Tudo que você precisa é docker e docker-compose.

Usaremos uma configuração de arquivo docker-compose baseada nos seguintes repositórios: faísca de ligação, link kafka.

A ./src volume é onde colocaremos nossos scripts.

Para iniciar o ambiente, basta executar

docker-compose up

Todo o código está disponível neste Repositório GitHub.

Uma das coisas que mais gostei quando comecei a estudar o Spark foi a semelhança entre o código escrito para ele e meus scripts habituais em python+pandas. Foi muito fácil migrar.

Seguindo a mesma lógica, o módulo de streaming do Spark é muito semelhante ao código spark usual, facilitando a migração das aplicações em lote para as de fluxo.

Dito isso, nas seções a seguir nos concentraremos em aprender as especificidades do streaming estruturado Spark, ou seja, quais novos recursos ele possui.

Nosso primeiro trabalho

Vamos começar devagar e construir um exemplo de brinquedo

A primeira coisa a fazer é criar um tópico Kafka de onde nosso spark job consumirá as mensagens.

Isso é feito por acessando o terminal de contêineres Kafka e executando:

kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test_topic

Para simular um produtor escrevendo mensagens sobre este tema, vamos usar o produtor de console kafka. Também dentro do contêiner:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_topic --property "parse.key=true" --property "key.separator=:"

A partir de agora, cada linha digitada no terminal será enviada como mensagem para o tópico de teste. O caractere “:” é usado para separar a chave e o valor da mensagem (chave:valor).

Vamos criar um job do Spark para consumir este tópico.

O código precisa ser colocado dentro do /src/streaming pasta (nada de especial, apenas a pasta que escolhi).

O principal a observar é que estamos usando os atributos lerStream e escreverStream, em vez de leitura e gravação normais. Este é o principal aspecto que faz com que o Spark trate nosso trabalho como uma aplicação de streaming.

Para conectar-se ao Kafka, é necessário especificar o servidor e o tópico. A opção startOffsets=“mais cedo” diz ao Spark para ler o tópico desde o início. Além disso, como Kafka armazena suas mensagens em binário forma, eles precisam ser decodificados para corda.

As outras opções serão mais exploradas.

Agora, vamos acessar o contêiner do Spark e executar o trabalho.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/read_test_stream.py

Após alguns segundos de configuração, ele começará a consumir o tópico.

Spark consumindo mensagens de Kafka. Imagem do autor.

Spark Streaming funciona em microlote modo, e é por isso que vemos as informações do “lote” quando ele consome as mensagens.

O microlote está entre o streaming “verdadeiro” completo, onde todas as mensagens são processadas individualmente à medida que chegam, e o lote normal, onde os dados permanecem estáticos e são consumidos sob demanda. O Spark esperará algum tempo tentando acumular mensagens para processá-las juntas, reduzindo a sobrecarga e aumentando a latência. Isso pode ser ajustado às suas necessidades.

Não sou um digitador super rápido, então o Spark processa a mensagem antes que eu possa incluir novas no lote atual.

E esse foi o nosso primeiro trabalho de streaming!

Espero que você tenha entendido: não é difícil codificar um trabalho de processamento de fluxo, mas existem algumas pegadinhas.

Gravando dados em um fluxo Kafka

Agora é hora de começar a brincar com os dados do sensor.

Você pode baixar o zip arquivo de AGOSTO DE 2022 e extraia-o para o /dados volume. Os dados estão originalmente em JSON e ocupam cerca de 23 Gb de espaço. A primeira coisa a fazer é convertê-lo para parquet para otimizar o espaço em disco e o tempo de leitura.

Os spark jobs para fazer isso estão detalhados no repositório GitHub, tudo que você precisa fazer é executá-los:

spark-submit /src/transform_json_to_parquet.pyspark-submit /src/join_parquet_files.py

Dependendo da sua máquina, a execução pode demorar algum tempo. Mas vale a pena, o tamanho final do arquivo parquet é de aproximadamente 1 Gb (mais de 20x menor) e muito mais rápido de ler.

Também precisamos criar o tópico Kafka para receber nossas mensagens:

kafka-topics.sh --create --replication-factor 1 --bootstrap-server localhost:9092 --topic tráfego_sensor

Opcionalmente, se quiser exibir as mensagens que chegam, é possível configurar um consumidor de console.

kafka-console-consumer.sh --topic tráfego_sensor --bootstrap-server localhost:9092

Escrever dados sobre um tópico Kafka é fácil, mas contém alguns detalhes.

No streaming estruturado, o comportamento padrão é não tentar inferir o esquema de dados (colunas e seus tipos), por isso precisamos passar um.

As mensagens Kafka são apenas pares de strings binárias de valor-chave, portanto, precisamos representar nossos dados neste formato. Isso pode ser facilmente alcançado convertendo todas as linhas em strings JSON, codificando-as em binário e armazenando o resultado na coluna “valor”.

Transformando colunas em strings JSON. Imagem do autor.

As chaves de mensagens são muito importantes no Kafka, mas não serão úteis em nossos testes, portanto todas as mensagens serão iguais.

Como mencionei antes, este conjunto de dados é ENORME, então limitei o número de mensagens inseridas a 500,000.

Por fim, passamos o servidor e o tópico Kafka e um “ponto de verificaçãoLocalização”onde o spark armazenará o progresso da execução, útil para se recuperar de erros.

Executando o trabalho:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/insert_traffic_topic.py
Inserindo dados no Kafka. Imagem do autor.

À esquerda, o trabalho do Spark lê o arquivo, à direita, um kafka-console-consumidor exibe as mensagens recebidas.

Nosso tópico de tráfego está preenchido e quase pronto para ser processado.

É importante lembrar que usamos um spark job para preencher nosso tópico apenas para fins de aprendizagem. Num cenário real, os próprios sensores enviarão leituras diretamente para Kafka.

Para simular esse comportamento dinâmico, o script abaixo grava 1 linha no tópico a cada 2.5 segundos.

Modos de saída — Contando o número de veículos por tipo

Seguindo em frente, vamos criar um trabalho para contar o número de veículos por tipo.

A coluna “Classificação” contém o tipo de veículo detectado.

À medida que lemos o tópico, precisamos converter as strings binárias JSON de volta para o formato colunar.

Feito isso, a consulta pode ser construída normalmente. É interessante notar que o coração da consulta é apenas o selecionar().groupB().contar() sequência, todo o resto é relativo à lógica de streaming.

Então é hora de abordar o modo de saída() opção.

O modo de saída de um aplicativo de fluxo especifica como queremos (re)calcular e gravar os resultados à medida que novos dados chegam.

Pode assumir três valores diferentes:

  • Acrescentar: adicione apenas novos registros à saída.
  • Preencha: recalcular o resultado completo para cada novo registro.
  • Atualizar: Atualizar registros alterados.

Esses modos podem ou não fazer sentido dependendo do aplicativo escrito. Por exemplo, o modo “completo” pode não fazer sentido se for realizado qualquer agrupamento ou classificação.

Vamos executar o trabalho no modo “completo” e ver os resultados.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/group_by_vehicle_type.py
Caminhão — Caminhão, Automóvel-Carro, Indefinido-Indefinido, Ônibus-Ônibus, Moto-Motociclo. Imagem do autor.

À medida que novos registros são inseridos no fluxo (veja o terminal à direita), a tarefa recalcula o resultado completo. Isso pode ser útil em situações em que a ordem das linhas é importante, como classificação ou competição.

No entanto, esta abordagem pode não ser a ideal se o número de grupos for demasiado grande ou se as alterações individuais não afetarem o resultado global.

Assim, outra opção é utilizar o modo de saída “atualização”, que gera uma nova mensagem apenas para os grupos que foram alterados. Veja abaixo:

A consulta com modo de saída “atualização”. Imagem do autor.

O modo “anexar” não está disponível para consultas com agrupamento, portanto não poderei mostrar usando o mesmo trabalho. Mas acho que é o modo mais simples, sempre adiciona um novo registro à saída.

Esses modos de saída são mais simples de entender se você pensar em salvar os resultados em uma tabela. No modo de saída completo, a tabela será reescrita a cada nova mensagem processada, no modo de atualização, apenas as linhas onde ocorreu alguma atualização, e o acréscimo sempre adicionará uma nova linha ao final.

Janela de tempo decrescente — Agregação usando intervalos de tempo

Em sistemas de streaming, as mensagens possuem dois carimbos de data/hora diferentes relacionados a elas: Hora do evento — A hora em que a mensagem foi criada, no nosso caso o tempo de leitura do sensor, e Hora do processamento — Quando a mensagem é lida pelo agente de processamento, no nosso caso quando atinge Spark.

Um recurso importante das ferramentas de processamento de fluxo é a capacidade de lidar com o processamento de tempo de evento. Janelas em cascata são intervalos de tempo fixos não sobrepostos usados ​​para fazer agregações usando colunas de tempo de evento. Para simplificar, eles dividem a linha do tempo em fatias de tamanhos iguais, de modo que cada evento pertença a um único intervalo.

Por exemplo, conte, a cada 5 minutos, quantos veículos foram detectados nos últimos 5 minutos.

Janela caindo de 5 minutos. Imagem do autor.

O código abaixo ilustra isso:

Este tipo de processamento pode ser extremamente útil em muitas situações. Voltando ao detector de engarrafamentos proposto anteriormente, uma abordagem possível é medir a velocidade média dos veículos numa janela de 10 minutos e ver se está abaixo de um determinado limite.

O processamento em tempo de evento é um tópico complexo. Tudo pode acontecer ao lidar com isso, como perda de mensagens, chegada tarde demais ou falha. O Spark possui vários mecanismos para tentar mitigar os problemas, como marcas d'água, nos quais não vamos nos concentrar.

As janelas de tempo também podem ser usadas em conjunto com outras colunas na grupoBy(). O exemplo abaixo conta o número de veículos por tipo em uma janela de 5 minutos.

Janela de tempo móvel — Flexibilização nos intervalos de tempo

As janelas de tempo deslizantes são uma flexibilização das janelas giratórias. Em vez de criar intervalos não sobrepostos, permitem definir com que frequência cada intervalo será criado.

Por exemplo, a cada 5 minutos, conte quantos veículos foram detectados nos últimos 30 minutos.

Por isso, os eventos podem pertencer a vários intervalos e ser contados quantas vezes forem necessárias.

Para definir uma janela deslizante, basta passar o intervalo de atualização para o janela() função.

Vamos ver o resultado.

Como podemos ver, temos janelas de 30min sendo criadas a cada 5min.

Essa flexibilidade pode ser bastante útil para definir regras de negócio mais específicas e gatilhos mais complexos. Por exemplo, nosso detector de engarrafamentos pode enviar respostas a cada 5 segundos sobre os últimos 10 minutos e criar um alerta quando a velocidade média do carro cair abaixo de 20 km/h.

Esta foi uma rápida visão dos principais conceitos do Spark Structured Streaming e como eles podem ser aplicados com o Kafka.

Apache Kafka e Apache Spark são ferramentas confiáveis ​​e robustas usadas por muitas empresas para processar diariamente quantidades incríveis de dados, tornando-os um dos pares mais fortes na tarefa de processamento de fluxo.

Aprendemos como preencher, consumir e processar tópicos do Kafka usando trabalhos do Spark. Não foi uma tarefa difícil, como mencionado no post, a API de processamento de stream é quase igual à API de lote normal, com apenas alguns pequenos ajustes.

Também discutimos diferentes modos de saída, algo específico para aplicativos de streaming e como cada um pode ser usado. Por último, mas não menos importante, exploramos agregações com janelas de tempo, uma das principais capacidades do processamento de fluxo.

Novamente, esta foi uma olhada rápida e deixarei algumas referências abaixo se você quiser explorar mais a fundo.

Espero ter ajudado de alguma forma, obrigado pela leitura! 🙂

Todo o código está disponível neste Repositório GitHub.
Dados usados ​​—
Contagens Volumétricas de Radares, dados abertos, governador brasileiro.

[1] Aprofundamento dos recursos: Marca d'água no streaming estruturado do Apache Spark - Max Fisher no blog Databricks
[2] Chambers, B. e Zaharia, M. (2018). Spark: o guia definitivo: processamento de big data simplificado. “O'Reilly Media, Inc.”.
[3] Logística, remessa e transporte em tempo real com Apache Kafka-Kai Waehner
[4] Apresentando Apache Kafka no Netflix Studio e Finance World - Blog confluente
[5] Spark Streaming e Kafka - https://sparkbyexamples.com/

Uma rápida olhada no Spark Structured Streaming + Kafka republicado da fonte https://towardsdatascience.com/a-fast-look-at-spark-structured-streaming-kafka-f0ff64107325?source=rss—-7f60cf5620c9—4 via https:/ /towardsdatascience.com/feed

<!–

->

Carimbo de hora:

Mais de Consultores Blockchain