Una mirada rápida a Spark Structured Streaming + Kafka

Aprendiendo los conceptos básicos de cómo utilizar este poderoso dúo para tareas de procesamiento de flujo

Foto por Nikhita Singhal on Unsplash

Recientemente comencé a estudiar mucho sobre Apache Kafka y Apache Spark, dos tecnologías líderes en el mundo de la ingeniería de datos.

He realizado varios proyectos usándolos en los últimos meses; “Transmisión de aprendizaje automático con Kafka, Debezium y BentoML” es un ejemplo. Mi objetivo es aprender cómo crear potentes canales de datos con estas famosas herramientas modernas y tener una idea de sus ventajas y desventajas.

En los últimos meses, ya cubrí cómo crear canalizaciones ETL usando ambas herramientas pero nunca usándolas juntas, y ese es el vacío que llenaré hoy.

Nuestro objetivo es aprender la idea general detrás de la construcción de una aplicación de streaming con Spark+Kafka y dar un vistazo rápido a sus conceptos principales utilizando datos reales.

La idea es simple: Apache Kafka es una herramienta de transmisión de mensajes, donde los productores escriben mensajes en un extremo de una cola (llamada tema) para ser leído por los consumidores, por el otro.

Pero es una herramienta muy compleja, diseñada para ser un servicio de mensajería distribuida resistente, con todo tipo de garantías de entrega (exactamente una vez, una vez, cualquiera), almacenamiento y replicación de mensajes, al tiempo que permite flexibilidad, escalabilidad y alto rendimiento. Tiene un conjunto más amplio de casos de uso, como comunicación de microservicios, sistemas de eventos en tiempo real y canales de transmisión ETL.

Apache Spark es un motor de transformación de datos basado en memoria distribuida.

También es una herramienta muy compleja, capaz de conectarse con todo tipo de bases de datos, sistemas de archivos e infraestructura de nube. Está diseñado para operar en entornos distribuidos para paralelizar el procesamiento entre máquinas, logrando transformaciones de alto rendimiento mediante el uso de su filosofía de evaluación diferida y optimizaciones de consultas.

Lo bueno de esto es que, al final del día, el código es solo su consulta SQL habitual o (casi) su script Python+pandas, con toda la brujería abstraída bajo una API de alto nivel fácil de usar.

Unamos estas dos tecnologías y tendremos la combinación perfecta para construir un canal ETL de streaming.

Usaremos los datos de los sensores de tráfico en la ciudad de Belo Horizonte (BH), la capital de Minas Gerais (Brasil). Es un enorme conjunto de datos que contiene mediciones del flujo de tráfico en varios lugares de la ciudad. Cada sensor detecta periódicamente el tipo de vehículo que circula en esa ubicación (coche, moto, autobús/camión), su velocidad y longitud (y otra información que no vamos a utilizar).

Este conjunto de datos representa precisamente una de las aplicaciones clásicas de los sistemas de transmisión: un grupo de sensores que envían sus lecturas continuamente desde el campo.

En este escenario, Apache Kafka se puede utilizar como capa de abstracción entre los sensores y las aplicaciones que consumen sus datos.

Kafka lo utiliza como capa de abstracción entre fuentes y servicios. Imagen del autor.

Con este tipo de infraestructura es posible construir todo tipo de (las llamadas) sistemas controlados por eventos en tiempo real, como un programa para detectar y alertar de atascos cuando el número de vehículos aumenta repentinamente con una caída de la velocidad media.

Y ahí es donde entra en juego Apache Spark.

Tiene un módulo nativo para el procesamiento de transmisiones llamado Transmisión estructurada de Spark, que puede conectarse a Kafka y procesar sus mensajes.

Configurando el medio ambiente

Todo lo que necesitas es Docker y Docker-Compose.

Usaremos una configuración de archivo Docker-Compose basada en los siguientes repositorios: chispa de enlace, enlace kafka.

El ./origen El volumen es donde vamos a poner nuestros guiones.

Para iniciar el entorno, simplemente ejecute

docker-componer

Todo el código está disponible en este Repositorio GitHub.

Una de las cosas que más me gustó cuando comencé a estudiar Spark fue la similitud entre el código escrito y mis scripts habituales de Python+pandas. Fue muy fácil migrar.

Siguiendo la misma lógica, el módulo de transmisión de Spark es muy similar al código Spark habitual, lo que facilita la migración de las aplicaciones por lotes a las de transmisión.

Dicho esto, en las siguientes secciones, nos centraremos en conocer las especificidades de la transmisión estructurada de Spark, es decir, qué nuevas características tiene.

nuestro primer trabajo

Comencemos despacio y construyamos un ejemplo de juguete.

Lo primero que debemos hacer es crear un tema de Kafka desde donde nuestro trabajo Spark consumirá los mensajes.

Esto se hace por acceder a la terminal de contenedores de Kafka y ejecutando:

kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test_topic

Para simular un productor escribiendo mensajes sobre este tema, usemos el productor-consola-kafka. También dentro del contenedor:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_topic --property "parse.key=true" --property "key.separator=:"

A partir de ahora, cada línea escrita en la terminal se enviará como un mensaje al tema de prueba. El carácter “:” se utiliza para separar la clave y el valor del mensaje (clave:valor).

Creemos un trabajo de Spark para consumir este tema.

El código debe colocarse dentro del /src/transmisión carpeta (nada especial, solo la carpeta que elegí).

La cosa clave a tener en cuenta es que estamos usando los atributos leerStream y escribir flujo, en lugar de lectura y escritura normales. Este es el aspecto principal que hace que Spark trate nuestro trabajo como una aplicación de streaming.

Para conectarse a Kafka, es necesario especificar el servidor y el tema. La opción compensaciones iniciales = “lo antes posible” le dice a Spark que lea el tema desde el principio. Además, debido a que Kafka almacena sus mensajes en binario forma, necesitan ser decodificados para cadena.

Las otras opciones se explorarán más a fondo.

Ahora, accedamos al contenedor Spark y ejecutemos el trabajo.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/read_test_stream.py

Luego de unos segundos de configuración, comenzará a consumir el tema.

Chispa que consume mensajes de Kafka. Imagen del autor.

Spark Streaming funciona en micro-lotes modo, y es por eso que vemos la información de "lote" cuando consume los mensajes.

El microprocesamiento por lotes se sitúa entre el streaming “verdadero” completo, en el que todos los mensajes se procesan individualmente a medida que llegan, y el lote habitual, en el que los datos permanecen estáticos y se consumen según demanda. Spark esperará algún tiempo intentando acumular mensajes para procesarlos juntos, reduciendo la sobrecarga y aumentando la latencia. Esto se puede ajustar a sus necesidades.

No escribo muy rápido, por lo que Spark procesa el mensaje antes de que pueda incluir otros nuevos en el lote actual.

¡Y ese fue nuestro primer trabajo de streaming!

Espero que tengas la sensación: no es difícil codificar un trabajo de procesamiento de flujo, pero hay algunas trampas.

Escribir datos en una secuencia de Kafka

Ahora es el momento de empezar a jugar con los datos del sensor.

Usted puede descargar el Código Postal archivo de AGOSTO de 2022 y extráigalo en el /datos volumen. Los datos están originalmente en JSON y ocupan alrededor de 23 Gb de espacio. Lo primero que debemos hacer es convertirlo a parquet para optimizar el espacio en disco y el tiempo de lectura.

Los trabajos de Spark para hacer esto se detallan en el repositorio de GitHub, todo lo que necesitas hacer es ejecutarlos:

envío de chispa /src/transform_json_to_parquet.pyenvío de chispa /src/join_parquet_files.py

Dependiendo de su máquina, la ejecución puede tardar algún tiempo. Pero vale la pena, el tamaño final del archivo parquet es ~1Gb (más de 20 veces más pequeño) y mucho más rápido de leer.

También necesitamos crear el tema de Kafka para recibir nuestros mensajes:

kafka-topics.sh --create --replication-factor 1 --bootstrap-server localhost:9092 --topic Traffic_sensor

Opcionalmente, si desea mostrar los mensajes que llegan, es posible configurar un consumidor de consola.

kafka-console-consumer.sh --tema Traffic_sensor --bootstrap-server localhost:9092

Escribir datos sobre un tema de Kafka es fácil, pero tiene algunos detalles.

En la transmisión estructurada, el comportamiento predeterminado es no intentar inferir el esquema de datos (columnas y sus tipos), por lo que debemos pasar uno.

Los mensajes de Kafka son solo pares de cadenas binarias clave-valor, por lo que debemos representar nuestros datos en este formato. Esto se puede lograr fácilmente convirtiendo todas las filas en cadenas JSON, codificándolas en binario y almacenando el resultado en la columna "valor".

Transformando columnas a cadenas JSON. Imagen del autor.

Las claves de mensajes son muy importantes en Kafka, pero no serán útiles en nuestras pruebas, por lo que todos los mensajes tendrán las mismas.

Como se mencionó anteriormente, este conjunto de datos es ENORME, por lo que limité la cantidad de mensajes insertados a 500,000.

Finalmente, pasamos el servidor y tema de Kafka y un “punto de controlUbicación”donde Spark almacenará el progreso de la ejecución, útil para recuperarse de errores.

Ejecutando el trabajo:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/insert_traffic_topic.py
Insertar datos en Kafka. Imagen del autor.

A la izquierda, el trabajo de Spark lee el archivo, a la derecha, un consola-kafka-consumidor muestra los mensajes que llegan.

Nuestro tema de tráfico está completo y casi listo para ser procesado.

Es importante recordar que utilizamos un trabajo de chispa para completar nuestro tema solo con fines de aprendizaje. En un escenario real, los propios sensores enviarán lecturas directamente a Kafka.

Para simular este comportamiento dinámico, el siguiente script escribe 1 fila en el tema cada 2.5 segundos.

Modos de salida: contar el número de vehículos por tipo

Continuando, creemos un trabajo para contar la cantidad de vehículos por tipo.

La columna “Classificação” (Clasificación) contiene el tipo de vehículo detectado.

Mientras leemos el tema, necesitamos convertir las cadenas binarias JSON nuevamente al formato de columnas.

Una vez hecho esto, la consulta se puede construir como de costumbre. Es interesante notar que el centro de consulta es solo el selecciona().agrupar por().contar() secuencia, todo el resto es relativo a la lógica de transmisión.

Así que es hora de abordar el modo de salida() opción.

El modo de salida de una aplicación de flujo especifica cómo queremos (re)calcular y escribir los resultados a medida que llegan nuevos datos.

Puede asumir tres valores diferentes:

  • Adjuntar: Solo agrega nuevos registros a la salida.
  • Solución: Vuelva a calcular el resultado completo para cada nuevo registro.
  • Actualizar: Actualizar registros modificados.

Estos modos pueden o no tener sentido dependiendo de la aplicación escrita. Por ejemplo, el modo "completo" puede no tener sentido si se realiza alguna agrupación u ordenación.

Ejecutemos el trabajo en modo "completo" y observemos los resultados.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/group_by_vehicle_type.py
Caminhão — Camión, Automóvel-Car, Indefinido-Undefinido, Ônibus-Bus, Moto-Motocycle. Imagen del autor.

A medida que se insertan nuevos registros en la secuencia (consulte la terminal a la derecha), el trabajo vuelve a calcular el resultado completo. Esto puede resultar útil en situaciones en las que el orden de las filas es importante, como la clasificación o la competencia.

Sin embargo, este enfoque puede no ser óptimo si el número de grupos es demasiado grande o los cambios individuales no afectan el resultado general.

Entonces, otra opción es usar el modo de salida “actualizar”, que genera un nuevo mensaje solo para los grupos que han cambiado. Vea abajo:

La consulta con modo de salida “actualizar”. Imagen del autor.

El modo “añadir” no está disponible para consultas con agrupación, por lo que no podré mostrar usando el mismo trabajo. Pero creo que es el modo más simple, hacerlo agrega un nuevo registro a la salida.

Estos modos de salida son más sencillos de entender si piensa en guardar los resultados en una tabla. En el modo de salida completa, la tabla se reescribirá para cada nuevo mensaje procesado, en el modo de actualización, solo las líneas donde ocurrió alguna actualización, y el anexo siempre agregará una nueva línea al final.

Ventana de tiempo de caída: agregación mediante intervalos de tiempo

En los sistemas de transmisión, los mensajes tienen dos marcas de tiempo diferentes relacionadas con ellos: Hora del evento: la hora en que se creó el mensaje, en nuestro caso, la hora de lectura del sensor, y Hora de procesamiento: cuando el agente de procesamiento lee el mensaje, en nuestro caso, cuando llega a Spark.

Una característica importante de las herramientas de procesamiento de transmisiones es la capacidad de manejar el procesamiento del tiempo de eventos. Las ventanas giratorias son intervalos de tiempo fijos que no se superponen y que se utilizan para realizar agregaciones mediante columnas de tiempo de evento. Para decirlo de manera más simple, dividen la línea de tiempo en porciones del mismo tamaño para que cada evento pertenezca a un único intervalo.

Por ejemplo, cuente, cada 5 minutos, cuántos vehículos se detectaron en los últimos 5 minutos.

Ventana de caída de 5 minutos. Imagen del autor.

El siguiente código ilustra esto:

Este tipo de procesamiento puede resultar extremadamente útil en muchas situaciones. Volviendo al detector de atascos propuesto anteriormente, un posible enfoque es medir la velocidad promedio de los vehículos en una ventana de 10 minutos y ver si está por debajo de un cierto umbral.

El procesamiento en el momento del evento es un tema complejo. Al lidiar con ello, puede pasar de todo, como que los mensajes se pierdan, lleguen demasiado tarde o se estropeen. Spark tiene varios mecanismos para intentar mitigar los problemas, como marcas de agua, en el que no nos centraremos.

Las ventanas de tiempo también se pueden utilizar junto con otras columnas en el agrupar por(). El siguiente ejemplo cuenta la cantidad de vehículos por tipo en una ventana de 5 minutos.

Ventana temporal móvil: flexibilización de los intervalos temporales

Las ventanas de tiempo móviles son una flexibilización de las ventanas temporales. En lugar de crear intervalos que no se superpongan, permiten definir con qué frecuencia se creará cada intervalo.

Por ejemplo, cada 5 minutos, cuente cuántos vehículos se detectaron en los últimos 30 minutos.

Por eso, los eventos pueden pertenecer a muchos intervalos y contarse tantas veces como sea necesario.

Para definir una ventana deslizante, simplemente pase el intervalo de actualización al ventana() función.

Veamos el resultado.

Como podemos ver, se crean ventanas de 30 minutos cada 5 minutos.

Esta flexibilidad puede resultar muy útil para definir reglas comerciales más específicas y desencadenantes más complejos. Por ejemplo, nuestro detector de atascos de tráfico puede enviar respuestas cada 5 segundos durante los últimos 10 minutos y crear una alerta cuando la velocidad promedio del automóvil cae por debajo de 20 km/h.

Este fue un vistazo rápido a los conceptos principales de Spark Structured Streaming y cómo se pueden aplicar con Kafka.

Apache Kafka y Apache Spark son herramientas confiables y robustas utilizadas por muchas empresas para procesar diariamente cantidades increíbles de datos, lo que las convierte en una de las parejas más sólidas en la tarea de procesamiento de flujo.

Hemos aprendido cómo completar, consumir y procesar temas de Kafka utilizando trabajos de Spark. Esta no fue una tarea difícil, como se menciona en la publicación, la API de procesamiento de flujo es casi igual a la API por lotes habitual, con solo algunos ajustes menores.

También analizamos diferentes modos de salida, algo específico para aplicaciones de transmisión y cómo se puede usar cada uno. Por último, pero no menos importante, exploramos agregaciones con ventanas de tiempo, una de las principales capacidades del procesamiento de transmisiones.

Nuevamente, este fue un vistazo rápido y dejaré algunas referencias a continuación si deseas explorar más profundamente.

Espero haber ayudado de alguna manera, ¡gracias por leer! 🙂

Todo el código está disponible en este Repositorio GitHub.
Datos utilizados -
Contagenos Volumétricas de Radares, Datos abiertos, gobernador brasileño.

[ 1 ] Análisis profundo de funciones: marcas de agua en Apache Spark Structured Streaming — Max Fisher en el blog de Databricks
[2] Chambers, B. y Zaharia, M. (2018). Spark: La guía definitiva: procesamiento de big data simplificado. “O'Reilly Media, Inc.”.
[ 3 ] Logística, envío y transporte en tiempo real con Apache Kafka- Kai Waehner
[ 4 ] Con Apache Kafka en Netflix Studio y Finance World — Blog confluente
[5] Transmisión de chispas y Kafka - https://sparkbyexamples.com/

Una mirada rápida a Spark Structured Streaming + Kafka Publicado nuevamente desde la fuente https://towardsdatascience.com/a-fast-look-at-spark-structured-streaming-kafka-f0ff64107325?source=rss—-7f60cf5620c9—4 vía https:/ /towardsdatascience.com/feed

<!–

->

Sello de tiempo:

Mas de Consultores Blockchain