A Fast Look at Spark Structured Streaming + Kafka

Learning the basics of how to use this powerful duo for stream-processing tasks

Photo by Nikhita Singhal on Unsplash

Recently I started studying a lot about Apache Kafka and Apache Spark, two leading technologies in the data engineering world.

I’ve made several projects using them in the last few months; “Machine Learning Streaming with Kafka, Debezium, and BentoML” is an example. My focus is to learn how to create powerful data pipelines with these modern famous tools and get a sense of their advantages and disadvantages.

In the last months, I’ve already covered how to create ETL pipelines using both tools but never using them together, and that’s the gap I’ll be filling today.

Our goal is to learn the general idea behind building a streaming application with Spark+Kafka and give a fast look at its main concepts using real data.

The idea is simple — Apache Kafka is a message streaming tool, where producers write messages on one end of a queue (called a topic) to be read by consumers on the other.

But it’s a very complex tool, built to be a resilient distributed messaging service, with all sorts of delivery guarantees (exactly once, once, any), message storage, and message replication, while also allowing flexibility, scalability, and high throughput. It has a broader set of use cases, like microservices communication, real-time event systems, and streaming ETL pipelines.

Apache Spark is a distributed memory-based data transformation engine.

It’s also a very complex tool, able to connect with all sorts of databases, file systems, and cloud infrastructure. It is geared to operate in distributed environments to parallelize processing between machines, achieving high-performance transformations by using its lazy evaluation philosophy and query optimizations.

The cool part about it is that, by the end of the day, the code is just your usual SQL query or (almost) your Python+pandas script, with all the witchcraft abstracted under a nice user-friendly high-level API.

Join these two technologies and we have a perfect match to build a streaming ETL pipeline.

We’ll be using the data from traffic sensors in the city of Belo Horizonte (BH), the capital of Minas Gerais (Brazil). It’s a huge dataset containing measurements of traffic flow in several places in the city. Each sensor periodically detects the type of vehicle driving at that location (car, motorcycle, bus/truck), its speed and length (and other information that we’re not going to use).

This dataset represents precisely one of the classical applications for streaming systems — a group of sensors sending their readings continuously from the field.

In this scenario, Apache Kafka can be used as an abstraction layer between the sensors and the applications that consume their data.

Kafka used as an abstraction layer between sources and services. Image by Author.

With this kind of infrastructure, it’s possible to build all sorts of (the so-called) real-time event-driven systems, like a program to detect and alert for traffic jams when the number of vehicles suddenly increases with a drop in average speed.

And that’s where Apache Spark comes into play.

It has a native module for stream processing called Spark Structured Streaming, that can connect to Kafka and process its messages.

Setting up the environment

All you need is docker and docker-compose.

We’ll use a docker-compose file configuration based on the following repositories: link spark, link kafka.

The ./src volume is where we going to put our scripts.

To start the environment, just run

docker-compose up

All the code is available in this GitHub repository.

One of the things that I most liked when start studying Spark was the similarity between written code for it and my usual python+pandas scripts. It was very easy to migrate.

Following the same logic, Spark’s streaming module is very similar to the usual spark code, making it easy to migrate from the batch applications to the stream ones.

With that said, in the following sections, we’ll be focusing on learning the specificities of Spark structured streaming, i.e., what new features it has.

Our first job

Let’s start slow and build a toy example

The first thing to do is create a Kafka topic from where our spark job will consume the messages.

This is done by accessing the Kafka container terminal and executing:

kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test_topic

To simulate a producer writing messages on this topic, let’s use the kafka-console-producer. Also inside the container:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test_topic --property "parse.key=true" --property "key.separator=:"

From now, every line typed in the terminal will be sent as a message to the test topic. The character “:” is used to separate the message’s key and value (key:value).

Let’s create a Spark job to consume this topic.

The code needs to be put inside the /src/streaming folder (nothing special, just the folder that I chose).

The key thing to note is that we’re using the attributes readStream and writeStream, instead of normal read and write. This is the main aspect that makes Spark treats our job as a streaming application.

To connect to Kafka, it is necessary to specify the server and topic. The option startingOffsets=“earliest” tells Spark to read the topic from the beginning. Also, because Kafka stores its messages in binary form, they need to be decoded to string.

The other options will be further explored.

Now, let’s access the Spark container and run the job.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/read_test_stream.py

After a few seconds of configuration, it will start consuming the topic.

Spark consuming messages from Kafka. Image by Author.

Spark Streaming works in micro-batching mode, and that’s why we see the “batch” information when it consumes the messages.

Micro-batching is somewhat between full “true” streaming, where all the messages are processed individually as they arrive, and the usual batch, where the data stays static and is consumed on-demand. Spark will wait some time trying to accumulate messages to process them together, reducing overhead and increasing latency. This can be tuned to your needs.

I’m not a super fast typer, so Spark processes the message before I can include new ones in the current batch.

And that was our first streaming job!

I hope that you get the feeling: it’s not hard to code a stream processing job, but there are some gotchas.

Writing data to a Kafka stream

Now it’s time to start playing with the sensor data.

You can download the zip file from AUGUST 2022 and extract it into the /data volume. The data is originally in JSON and takes around 23Gb of space. The first thing to do is convert it to parquet to optimize disk space and reading time.

The spark jobs to do this are detailed in the GitHub repository, all you need to do is execute them:

spark-submit /src/transform_json_to_parquet.pyspark-submit /src/join_parquet_files.py

Depending on your machine, the execution may take some time. But it pays out, the final parquet file size is ~1Gb (more than 20x smaller) and much faster to read.

We also need to create the Kafka topic to receive our messages:

kafka-topics.sh --create --replication-factor 1 --bootstrap-server localhost:9092 --topic traffic_sensor

Optionally, if you want to display the arriving messages, it’s possible to set up a console consumer.

kafka-console-consumer.sh --topic traffic_sensor --bootstrap-server localhost:9092

Writing data on a Kafka topic is easy, but has some details.

In structured streaming, the default behavior is to not try to infer the data schema (columns and their types), so we need to pass one.

Kafka messages are just key-value binary string pairs, so we need to represent our data in this format. This can be easily achieved by converting all rows to JSON strings, encoding them in binary, and storing the result in the “value” column.

Transforming columns to JSON strings. Image by Author.

Message keys are very important in Kafka, but they will not be useful in our tests, so all messages will have the same.

As mentioned before, this dataset is HUGE, so I limited the number of messages inserted to 500,000.

Finally, we pass the Kafka server and topic and a “checkpointLocation” where the spark will store the execution progress, useful to recover from errors.

Executing the job:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/insert_traffic_topic.py
Inserting data into Kafka. Image by Author.

On the left, the Spark job reads the file, on the right, a kafka-console-consumer displays the arriving messages.

Our traffic topic is populated and almost ready to be processed.

It’s important to remember that we used a spark job to populate our topic just for learning purposes. In a real scenario, the sensors themselves will send readings directly to Kafka.

To simulate this dynamic behavior, the script below writes 1 row to the topic every 2.5 seconds.

Output modes — Counting the number of vehicles by type

Moving on, let’s create a job to count the number of vehicles by type.

The column “Classificação” (Classification) contains the vehicle type detected.

As we’re reading from the topic, we need to convert the JSON binary strings back to the columnar format.

Once this is done, the query can be built as usual. It’s interesting to note that the query heart is just the select().groupBy().count() sequence, all the rest is relative to streaming logic.

So it’s time to address the outputMode() option.

The output mode of a stream application specifies how we want to (re)compute and write the results as new data arrives.

It can assume three different values:

  • Append: Only add new records to the output.
  • Complete: Recompute the full result for each new record.
  • Update: Update changed records.

These modes can or cannot make sense depending on the application written. For example, the “complete” mode may not make sense if any grouping or sorting is performed.

Let’s execute the job in “complete” mode and look at the results.

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /src/streaming/group_by_vehicle_type.py
Caminhão — Truck, Automóvel-Car, Indefinido-Undefined, Ônibus-Bus, Moto-Motocycle. Image by Author.

As new records are inserted in the stream (see the terminal on the right), the job recomputes the full result. This can be useful in situations where row ordering is important, like ranking or competition.

However, this approach may not be optimal if the number of groups is too big or the individual changes do not impact the overall result.

So, another option is to use the “update” output mode, which generates a new message only for the groups that have changed. See below:

The query with output mode “update”. Image by Author.

The “append” mode is not available to queries with grouping, so I’ll not be able to show using the same job. But I think that it is the simplest mode, it always adds a new record to the output.

These output modes are simpler to understand if you think about saving the results to a table. In the complete output mode, the table will be rewritten for every new message processed, in the update mode, just the lines where some update occurred, and the append will always add a new line to the end.

Tumbling time window — Aggregating using time intervals

In streaming systems, messages have two different timestamps related to them: Event time — The time when the message was created, in our case the sensor’s reading time, and Processing time — When the message is read by the processing agent, in our case when it reaches Spark.

An important feature of stream processing tools is the ability to handle event time processing. Tumbling windows are non-overlapping fixed time intervals used to make aggregations using event-time columns. To put it more simply, they slice the timeline into equally sized slices so each event belongs to a single interval.

For example, count, every 5 minutes, how many vehicles were detected in the last 5 minutes.

5min tumbling window. Image by Author.

The code below illustrates this:

This kind of processing can be extremely useful in many situations. Going back to the traffic jam detector proposed earlier, one possible approach is to measure the vehicles’ average speed in a 10 min window and see if it is below a certain threshold.

Event-time processing is a complex topic. Everything can happen when dealing with it, like messages being lost, arriving too late, or getting out of order. Spark has several mechanisms to try to mitigate the issues, like watermarks, that we’ll not focus on.

Time windows can also be used in conjunction with other columns in the groupBy(). The example below counts the number of vehicles by type in a 5min window.

Sliding time window — Flexibilization on the time intervals

Sliding time windows are a flexibilization of tumbling windows. Instead of creating non-overlapping intervals, they allow defining how often each interval will be created.

For example, every 5 minutes, count how many vehicles were detected in the last 30 minutes.

Because of that, events can belong to many intervals and be counted as many times as needed.

To define a sliding window, just pass the update interval to the window() function.

Let’s see the output.

As we can see, we have 30min windows being created each 5min.

This flexibility can be quite useful to define more specific business rules and more complex triggers. For example, our traffic jam detector can send responses every 5 seconds about the past 10 minutes and create an alert when the average car speed drops below 20km/h.

This was a fast look at the main concepts of Spark Structured Streaming and how they can be applied with Kafka.

Apache Kafka and Apache Spark are both reliable and robust tools used by many companies to daily process incredible amounts of data, making them one of the strongest pairs in the stream processing task.

We’ve learned how to populate, consume, and process Kafka topics using Spark jobs. This was no hard task, as mentioned in the post, the stream processing API is almost equal to the usual batch API, with just some minor adjustments.

We’ve also discussed different output modes, something specific to stream applications, and how each one can be used. Last but not least, we explored aggregations with time windows, one of the main capabilities of stream processing.

Again, this was a just fast look, and I’ll leave some references below if you want to explore deeper.

Hope I’ve helped somehow, thank you for reading! 🙂

All the code is available in this GitHub repository.
Data used —
Contagens Volumétricas de Radares, Open data, Brazilian Gov.

[1] Feature Deep Dive: Watermarking in Apache Spark Structured Streaming — Max Fisher on Databricks blog
[2] Chambers, B., & Zaharia, M. (2018). Spark: The definitive guide: Big data processing made simple. “ O’Reilly Media, Inc.”.
[3] Real-Time Logistics, Shipping, and Transportation with Apache Kafka— Kai Waehner
[4] Featuring Apache Kafka in the Netflix Studio and Finance World — Confluent blog
[5] Spark Streaming & Kafka — https://sparkbyexamples.com/

A Fast Look at Spark Structured Streaming + Kafka Republished from Source https://towardsdatascience.com/a-fast-look-at-spark-structured-streaming-kafka-f0ff64107325?source=rss—-7f60cf5620c9—4 via https://towardsdatascience.com/feed

<!–

–>

Time Stamp:

More from Blockchain Consultants