A custom Locust client lets you load test anything, not just HTTP.

Let’s say you need to test a WebSocket chat application. Locust’s built-in HTTP client won’t cut it. You need to establish a persistent WebSocket connection, send messages, and assert responses, all within Locust’s task execution model.

Here’s a simplified example of a custom client for a hypothetical WebSocket service.

from locust import HttpUser, task, between
import websocket
import json

class WebSocketClient:
    def __init__(self, host):
        self.host = host
        self.ws = None
        self.connect()

    def connect(self):
        try:
            self.ws = websocket.create_connection(self.host)
            print(f"Connected to {self.host}")
        except Exception as e:
            print(f"Failed to connect to {self.host}: {e}")
            self.ws = None

    def send_message(self, message):
        if self.ws:
            try:
                self.ws.send(json.dumps(message))
                print(f"Sent: {message}")
                return True
            except Exception as e:
                print(f"Failed to send message: {e}")
                return False
        return False

    def receive_message(self, timeout=5):
        if self.ws:
            try:
                result = self.ws.recv()
                print(f"Received: {result}")
                return result
            except websocket.WebSocketTimeoutException:
                print("Receive timed out.")
                return None
            except Exception as e:
                print(f"Failed to receive message: {e}")
                return None
        return None

    def disconnect(self):
        if self.ws:
            self.ws.close()
            print(f"Disconnected from {self.host}")

class ChatUser(HttpUser):
    wait_time = between(1, 5)
    host = "ws://localhost:8080" # Replace with your WebSocket server address

    def on_start(self):
        # Initialize the custom client
        self.client = WebSocketClient(self.host)
        if not self.client.ws:
            # If connection fails, we can't proceed with this user
            self.environment.runner.quit()

    def on_stop(self):
        # Clean up the custom client
        self.client.disconnect()

    @task
    def send_chat_message(self):
        message_payload = {"type": "chat", "text": "Hello, Locust!"}
        if self.client.send_message(message_payload):
            # Optionally receive and process a response
            response = self.client.receive_message()
            if response:
                # Assertions can be made here based on the expected response
                pass

This ChatUser class defines a custom client (WebSocketClient) that handles establishing and managing a WebSocket connection. The on_start method creates an instance of this client, and on_stop ensures it’s disconnected. The @task method then uses the client’s send_message and receive_message methods to interact with the WebSocket server.

The core problem this solves is extending Locust beyond its HTTP-centric origins. Many modern applications use protocols like WebSockets, gRPC, MQTT, or raw TCP sockets for communication. To load test these, you need a way to programmatically establish connections, send/receive data according to the protocol’s specification, and measure performance. A custom client pattern in Locust provides this flexibility.

Internally, Locust’s HttpUser class is itself a user class that wraps a requests session. When you define a custom user class (like ChatUser), you’re essentially doing the same thing: creating a user that manages its own specific client object. This client object encapsulates all the logic for interacting with your target system. The on_start and on_stop methods are crucial for setting up and tearing down the client’s state (like establishing a connection) before and after the user’s tasks run. The @task methods then operate on this client instance.

The most surprising thing about custom clients is how little Locust actually needs to know about your protocol. As long as your custom client object exposes methods that your tasks can call (like send_message, receive_message, connect, disconnect), Locust is happy. It treats your client object as a black box for execution purposes. The performance metrics (requests per second, response times) are still collected by Locust based on the duration of your task executions and any explicit timing you might wrap around your client calls, but the protocol-specific success/failure and data handling is entirely up to your custom client implementation.

To make this truly robust for load testing, you’d typically wrap the send_message and receive_message calls within your task with Locust’s statistics reporting. For example, instead of just self.client.send_message(...), you’d use something like:

from locust import HttpUser, task, between, tag
import websocket
import json
import time

class WebSocketClient:
    # ... (previous WebSocketClient code) ...
    def __init__(self, host, environment): # Pass environment for reporting
        self.host = host
        self.ws = None
        self.environment = environment # Store environment
        self.connect()

    def connect(self):
        start_time = time.time()
        try:
            self.ws = websocket.create_connection(self.host)
            total_time = time.time() - start_time
            self.environment.events.request.fire(
                request_type="websocket",
                name="connect",
                response_time=total_time * 1000, # ms
                response_length=0, # or some meaningful value
                exception=None,
                context={}
            )
            print(f"Connected to {self.host}")
            return True
        except Exception as e:
            total_time = time.time() - start_time
            self.environment.events.request.fire(
                request_type="websocket",
                name="connect",
                response_time=total_time * 1000,
                response_length=0,
                exception=e,
                context={}
            )
            print(f"Failed to connect to {self.host}: {e}")
            self.ws = None
            return False

    def send_message(self, message, message_name="send"):
        if self.ws:
            start_time = time.time()
            try:
                self.ws.send(json.dumps(message))
                total_time = time.time() - start_time
                self.environment.events.request.fire(
                    request_type="websocket",
                    name=f"{message_name}_message",
                    response_time=total_time * 1000,
                    response_length=len(json.dumps(message)),
                    exception=None,
                    context={}
                )
                print(f"Sent: {message}")
                return True
            except Exception as e:
                total_time = time.time() - start_time
                self.environment.events.request.fire(
                    request_type="websocket",
                    name=f"{message_name}_message",
                    response_time=total_time * 1000,
                    response_length=0,
                    exception=e,
                    context={}
                )
                print(f"Failed to send message: {e}")
                return False
        return False

    def receive_message(self, timeout=5, message_name="receive"):
        if self.ws:
            start_time = time.time()
            try:
                result = self.ws.recv()
                total_time = time.time() - start_time
                self.environment.events.request.fire(
                    request_type="websocket",
                    name=f"{message_name}_message",
                    response_time=total_time * 1000,
                    response_length=len(result) if result else 0,
                    exception=None,
                    context={}
                )
                print(f"Received: {result}")
                return result
            except websocket.WebSocketTimeoutException:
                total_time = time.time() - start_time
                self.environment.events.request.fire(
                    request_type="websocket",
                    name=f"{message_name}_message",
                    response_time=total_time * 1000,
                    response_length=0,
                    exception=websocket.WebSocketTimeoutException("Receive timed out."),
                    context={}
                )
                print("Receive timed out.")
                return None
            except Exception as e:
                total_time = time.time() - start_time
                self.environment.events.request.fire(
                    request_type="websocket",
                    name=f"{message_name}_message",
                    response_time=total_time * 1000,
                    response_length=0,
                    exception=e,
                    context={}
                )
                print(f"Failed to receive message: {e}")
                return None
        return None

    def disconnect(self):
        if self.ws:
            try:
                self.ws.close()
                print(f"Disconnected from {self.host}")
            except Exception as e:
                print(f"Error during disconnect: {e}")

class ChatUser(HttpUser):
    wait_time = between(1, 5)
    host = "ws://localhost:8080" # Replace with your WebSocket server address

    def on_start(self):
        # Initialize the custom client, passing the environment
        self.client = WebSocketClient(self.host, self.environment)
        if not self.client.ws:
            self.environment.runner.quit()

    def on_stop(self):
        self.client.disconnect()

    @task
    def send_and_receive_chat(self):
        message_payload = {"type": "chat", "text": "Hello, Locust!"}
        if self.client.send_message(message_payload, message_name="send_chat"):
            # Receive a response, potentially from a specific action
            response = self.client.receive_message(message_name="receive_chat_response")
            if response:
                # Add specific assertions here if needed
                pass

This refined WebSocketClient now uses self.environment.events.request.fire() to report metrics for connection, sending, and receiving directly to Locust’s statistics. This allows you to see connection times, message send times, and receive times in the Locust Web UI, just like you would for HTTP requests.

The next step is understanding how to integrate these custom clients with Locust’s distributed mode for large-scale testing.

Want structured learning?

Take the full Locust course →