MQTT load testing is surprisingly difficult because most load testing tools are designed for request/response protocols, not persistent connections and message queues.
Here’s a simplified view of a Locust test for an MQTT broker. Imagine this locustfile.py:
from locust import HttpUser, task, between
import paho.mqtt.client as mqtt
class MQTTClient:
def __init__(self, host, port, username=None, password=None):
self.client = mqtt.Client()
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message
self.connected = False
self.host = host
self.port = port
self.username = username
self.password = password
def connect(self):
try:
self.client.connect(self.host, self.port, 60)
self.client.loop_start()
except Exception as e:
print(f"Connection failed: {e}")
def disconnect(self):
self.client.loop_stop()
self.client.disconnect()
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
self.connected = True
print("Connected successfully.")
else:
print(f"Connection failed with code {rc}")
def on_disconnect(self, client, userdata, rc):
self.connected = False
print(f"Disconnected with code {rc}")
def on_message(self, client, userdata, msg):
print(f"Received message on topic {msg.topic}: {msg.payload.decode()}")
def publish(self, topic, payload):
if self.connected:
self.client.publish(topic, payload)
else:
print("Not connected, cannot publish.")
def subscribe(self, topic):
if self.connected:
self.client.subscribe(topic)
else:
print("Not connected, cannot subscribe.")
class MQTTUser(HttpUser):
wait_time = between(1, 5)
host = "mqtt://localhost:1883" # This is used by HttpUser, but we'll override it
def on_start(self):
# In a real scenario, you'd get host/port from Locust's web UI or env vars
mqtt_host = "localhost"
mqtt_port = 1883
self.mqtt_client = MQTTClient(mqtt_host, mqtt_port)
self.mqtt_client.connect()
def on_stop(self):
self.mqtt_client.disconnect()
@task
def publish_message(self):
if self.mqtt_client and self.mqtt_client.connected:
self.mqtt_client.publish("test/topic", "Hello from Locust!")
@task
def subscribe_to_topic(self):
if self.mqtt_client and self.mqtt_client.connected:
self.mqtt_client.subscribe("test/topic")
This locustfile.py defines an MQTTUser that simulates an MQTT client. When a user starts (on_start), it creates an MQTTClient instance and connects to the broker. Each task (publish_message, subscribe_to_topic) attempts to perform an MQTT operation. The wait_time controls the pause between tasks for each simulated user.
The core problem Locust solves here is managing a large number of persistent connections and simulating asynchronous message flow. Unlike HTTP where a request is sent and a response is received, MQTT involves establishing a connection, subscribing to topics, publishing messages, and potentially receiving messages. Locust’s event-driven model, combined with a library like paho-mqtt, allows us to simulate this.
The System in Action:
When you run Locust with this file (locust -f locustfile.py), you’ll see the Locust web UI. You can then specify the number of users and the spawn rate. For each user you start, a new paho-mqtt client will be instantiated and connect to your MQTT broker (e.g., Mosquitto, EMQX).
These clients will then begin executing the tasks defined. If you have a subscriber running elsewhere (or another Locust user subscribed to "test/topic"), you’ll see messages being published and received. Locust will track connection successes/failures, publish rates, and subscribe rates, giving you metrics on your broker’s performance under load.
The Mental Model:
- User as Client: Each simulated "user" in Locust is an independent MQTT client.
- Connection Management:
on_startandon_stophandle establishing and tearing down these persistent MQTT connections. This is crucial because MQTT is connection-oriented. - Task Execution: Tasks (
@task) define the actions these clients perform after connecting. These are not necessarily sequential HTTP requests but rather discrete MQTT operations likepublishorsubscribe. - Asynchronous Nature: MQTT is inherently asynchronous.
paho-mqtt’sloop_start()runs a background thread to handle incoming messages and network events. Locust’s tasks trigger the initiation of these operations. - Metrics Collection: Locust collects statistics on task execution times, success rates, and user counts, which translate to connection rates, publish rates, subscribe rates, etc., for your MQTT broker.
The Lever You Control:
The wait_time in Locust is your primary lever for controlling the rate at which individual users perform actions after they are connected. A shorter wait_time means a user will try to publish or subscribe more frequently. The total load comes from the number of users you simulate. You can also define multiple tasks with different weights to simulate varied client behavior (e.g., some clients publish more than subscribe).
A common misconception is that HttpUser is suitable for MQTT just because it’s a user. However, HttpUser is fundamentally built around HTTP’s request/response model. For protocols like MQTT, WebSockets, or other persistent connection protocols, you need to use User (the base class) and implement the protocol’s connection and message handling logic yourself, typically using a dedicated client library like paho-mqtt for MQTT.
The next step is often simulating bidirectional communication, where clients both publish and subscribe, and you need to measure message delivery latency.