The most surprising thing about using Locust with InfluxDB is that the primary benefit isn’t just storing metrics, but unlocking the ability to correlate load test results with other time-series data already residing in InfluxDB.

Let’s see it in action. Imagine you’re running a load test against a new API endpoint. You’ve got Locust spitting out requests per second, response times, and failure rates. But what if you also want to see how your CPU, memory, or disk I/O on the server was behaving at the exact same time as those spikes in response times? Or maybe you want to correlate it with network latency metrics from a separate monitoring tool that also feeds into InfluxDB.

Here’s a simplified Locust setup that sends metrics to InfluxDB.

# locustfile.py
from locust import HttpUser, task, between
import time
import influxdb_client
from influxdb_client.client.write_api import SYNCHRONOUS

class WebsiteUser(HttpUser):
    wait_time = between(1, 5)
    host = "http://your-api-host.com"

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # InfluxDB Connection Details
        self.influxdb_url = "http://localhost:8086"
        self.influxdb_token = "YOUR_INFLUXDB_TOKEN"
        self.influxdb_org = "your-org"
        self.influxdb_bucket = "locust-metrics"

        self.influxdb_client = influxdb_client.InfluxDBClient(
            url=self.influxdb_url,
            token=self.influxdb_token,
            org=self.influxdb_org
        )
        self.write_api = self.influxdb_client.write_api(write_options=SYNCHRONOUS)

    @task
    def index(self):
        self.client.get("/")

    def on_stop(self):
        self.influxdb_client.close()

    # Custom stats reporting to InfluxDB
    def on_batch_update(self, locust_stats, entry_point_stats, exception_stats):
        timestamp = int(time.time() * 1_000_000_000) # Nanosecond precision

        # Report overall stats
        for stat in locust_stats:
            point = influxdb_client.Point("locust_stats") \
                .tag("name", stat.name) \
                .tag("method", stat.method) \
                .field("count", stat.num_requests) \
                .field("num_failures", stat.num_failures) \
                .field("avg_response_time", stat.avg_response_time) \
                .field("min_response_time", stat.min_response_time) \
                .field("max_response_time", stat.max_response_time) \
                .field("avg_content_length", stat.avg_content_length) \
                .field("current_rps", stat.current_rps) \
                .field("current_fail_per_sec", stat.current_fail_per_sec) \
                .field("current_user_count", stat.current_user_count) \
                .time(timestamp)
            self.write_api.write(bucket=self.influxdb_bucket, org=self.influxdb_org, record=point)

        # Report entry point stats
        for entry_point_stat in entry_point_stats:
            point = influxdb_client.Point("locust_entry_point_stats") \
                .tag("name", entry_point_stat.name) \
                .tag("method", entry_point_stat.method) \
                .field("count", entry_point_stat.num_requests) \
                .field("num_failures", entry_point_stat.num_failures) \
                .field("avg_response_time", entry_point_stat.avg_response_time) \
                .field("min_response_time", entry_point_stat.min_response_time) \
                .field("max_response_time", entry_point_stat.max_response_time) \
                .field("avg_content_length", entry_point_stat.avg_content_length) \
                .field("current_rps", entry_point_stat.current_rps) \
                .field("current_fail_per_sec", entry_point_stat.current_fail_per_sec) \
                .time(timestamp)
            self.write_api.write(bucket=self.influxdb_bucket, org=self.influxdb_org, record=point)

        # Report exception stats
        for exception_stat in exception_stats:
            point = influxdb_client.Point("locust_exception_stats") \
                .tag("name", exception_stat.name) \
                .tag("exception", exception_stat.exception_type.__name__) \
                .field("count", exception_stat.count) \
                .time(timestamp)
            self.write_api.write(bucket=self.influxdb_bucket, org=self.influxdb_org, record=point)

To run this, you’ll need the influxdb-client Python package (pip install influxdb-client) and a running InfluxDB instance. Make sure your InfluxDB is configured with a bucket named locust-metrics, an organization your-org, and you have a token with write permissions.

The core of this integration lies in overriding the on_batch_update method in your HttpUser class. Locust calls this method periodically with aggregated statistics. Instead of just letting Locust report these to its web UI or standard output, we intercept them here. We create influxdb_client.Point objects for each relevant metric. These points include:

  • Measurement: A string identifying the type of data (e.g., locust_stats, locust_entry_point_stats).
  • Tags: Key-value pairs that are indexed and used for filtering and grouping (e.g., name of the endpoint, method like GET/POST, exception type).
  • Fields: The actual metric values (e.g., count, avg_response_time, current_rps).
  • Timestamp: Crucial for time-series analysis. We generate a nanosecond-precision timestamp.

These points are then written to your specified InfluxDB bucket using the write_api.

Once the data is in InfluxDB, the real power comes from querying it, often using Grafana as a visualization layer. You can write Flux queries like this to get the average response time for your /login endpoint:

from(bucket: "locust-metrics")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "locust_stats")
  |> filter(fn: (r) => r["name"] == "/login")
  |> filter(fn: (r) => r["_field"] == "avg_response_time")
  |> aggregateWindow(every: v.windowPeriod, fn: mean)
  |> yield(name: "mean_response_time")

This query selects data from the locust-metrics bucket within the Grafana time range, filters for measurements named locust_stats and specifically for the /login endpoint and the avg_response_time field. It then aggregates these values over time windows to smooth out the data.

The key insight is that InfluxDB is a database for time-stamped data. Locust is just one source of that data. You can simultaneously ingest metrics from your application servers (CPU, memory, network), infrastructure components, other services, and even manually injected business events. Then, in Grafana or directly via Flux, you can plot these disparate data streams on the same graph. Did response times spike because of increased load (Locust RPS), or was it a server resource constraint (CPU usage from another InfluxDB measurement)? You can see it all side-by-side.

A subtle but important detail is how Locust aggregates statistics. The on_batch_update is called with aggregated statistics since the last call. If your batch_size in Locust is set too high (e.g., 10000), you might miss fine-grained fluctuations within that batch, as the metrics sent to InfluxDB represent the average or sum over that entire batch. For high-resolution analysis, keeping the default batch_size or a smaller value is generally better.

The next step is to explore how to correlate Locust’s failure metrics with specific error logs from your application, which can also be ingested into InfluxDB.

Want structured learning?

Take the full Locust course →