MQTT and Spark are a potent combination for real-time IoT data processing, but understanding how they interact is key to unlocking their full potential.

Here’s Spark ingesting data from an MQTT broker, transforming it, and writing it back out.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger

val spark = SparkSession.builder()
  .appName("MQTT Spark Streaming")
  .master("local[*]") // Use local[*] for local testing
  .getOrCreate()

spark.sparkContext.setLogLevel("WARN") // Reduce verbosity

// Define your MQTT broker connection details
val mqttHost = "tcp://localhost:1883" // Replace with your MQTT broker address
val mqttTopic = "iot/data"

// Read from MQTT
val mqttStreamDF = spark.readStream
  .format("paho") // Spark's MQTT connector
  .option("host", mqttHost)
  .option("topic", mqttTopic)
  .load()

// Assume incoming data is JSON
val parsedDF = mqttStreamDF.selectExpr("CAST(payload AS STRING) as json_payload")
  .select(from_json(col("json_payload"), schema = "temperature DOUBLE, humidity DOUBLE, deviceId STRING").as("data"))
  .select("data.*")

// Example transformation: Calculate temperature in Fahrenheit
val transformedDF = parsedDF.withColumn("temperatureFahrenheit", (col("temperature") * 9 / 5) + 32)

// Write to console for demonstration
val query = transformedDF.writeStream
  .outputMode("append")
  .format("console")
  .option("truncate", "false")
  .trigger(Trigger.ProcessingTime("5 seconds")) // Process data in 5-second micro-batches
  .start()

query.awaitTermination()

The core problem MQTT and Spark solve together is bridging the gap between high-volume, low-latency IoT device streams and Spark’s powerful, distributed batch and micro-batch processing capabilities. MQTT, with its publish-subscribe model, efficiently handles device communication and message queuing. Spark Streaming (or Structured Streaming, as used above) then consumes these messages, allowing for complex transformations, aggregations, and analytics that would be impractical directly on embedded devices.

Internally, the paho connector in Spark acts as an MQTT client. It subscribes to the specified topics on the broker. As messages arrive, the connector enqueues them into Spark’s internal data structures. Structured Streaming then processes these micro-batches. The Trigger.ProcessingTime("5 seconds") setting dictates how often Spark attempts to process new data – in this case, every five seconds. The outputMode("append") means that only new rows added since the last trigger will be written to the sink.

When dealing with JSON payloads, the from_json function is your best friend. It requires a predefined schema that matches the structure of your JSON data. If your schema is dynamic or deeply nested, you might need more sophisticated parsing or schema inference techniques.

The paho connector handles the complexities of MQTT connection management, message acknowledgment, and retries. You configure it with the broker’s address and the topics you’re interested in. Spark’s distributed nature means that if you’re running on a cluster, multiple Spark executors can connect to the MQTT broker and process messages in parallel, dramatically increasing throughput.

One of the less obvious but crucial aspects of this setup is how Spark manages state for aggregations or complex operations across micro-batches. Structured Streaming uses a state store, often backed by HDFS or similar distributed file systems. When you perform operations like groupBy().agg(), Spark maintains the intermediate state between triggers. If a trigger fails or a node goes down, Spark can recover the state from its checkpointed location, ensuring exactly-once or at-least-once processing guarantees depending on your configuration and sink capabilities.

The next challenge is often handling malformed data or schema drift from your IoT devices, which can cause your from_json parsing to fail.

Want structured learning?

Take the full Mqtt course →