NATS doesn’t actually penalize slow consumers; it simply stops sending them messages until they catch up.
Let’s see this in action. Imagine a simple NATS setup with a publisher sending messages to a subject, and a consumer listening on that subject.
// Publisher
package main
import (
"log"
"time"
"github.com/nats-io/nats.go"
)
func main() {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
for i := 0; i < 1000; i++ {
msg := []byte("Hello NATS #" + string(i))
nc.Publish("updates", msg)
time.Sleep(10 * time.Millisecond) // Simulate some work
}
log.Println("Published 1000 messages.")
}
// Slow Consumer
package main
import (
"log"
"time"
"github.com/nats-io/nats.go"
)
func main() {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
sub, err := nc.Subscribe("updates", func(m *nats.Msg) {
log.Printf("Received message: %s", m.Data)
time.Sleep(500 * time.Millisecond) // Simulate slow processing
log.Printf("Finished processing message: %s", m.Data)
})
if err != nil {
log.Fatal(err)
}
defer sub.Unsubscribe()
log.Println("Subscribed to updates and processing messages slowly.")
select {} // Block forever
}
When you run the publisher and then the slow consumer, you’ll notice that messages arrive at the consumer, but there’s a significant delay between "Received message" and "Finished processing." The publisher, if it were running continuously, would effectively pause sending new messages to this specific consumer because the NATS server, by default, manages the flow control.
The core problem NATS solves with this implicit flow control is preventing a fast publisher from overwhelming a slow consumer, leading to message loss or excessive memory usage on the consumer side. When a consumer can’t acknowledge messages as quickly as they are being sent, the NATS server (specifically, the JetStream component if you’re using it for persistence and delivery guarantees, or the core NATS server with basic subscriptions) will buffer messages. However, this buffer isn’t infinite. For JetStream, this is managed by acknowledgments and consumer configuration. For core NATS, it’s a matter of how quickly the client library can process incoming data from the network buffer.
Here’s how the mental model breaks down:
- Core NATS Subscriptions: In the basic NATS model, the server pushes messages to subscribers. If a subscriber’s network buffer fills up because it’s not reading fast enough, the TCP connection will eventually stall, and the server will stop sending more data on that connection. This isn’t an explicit "penalty" but a natural consequence of network backpressure. The client library on the consumer side also has internal buffers. If these buffers overflow, messages can be dropped.
- JetStream: This is where explicit flow control and consumer management become more apparent. JetStream introduces the concepts of producers, consumers, message acknowledgments (acks), and delivery policies.
- Acknowledgments (Acks): When a consumer receives a message, it must acknowledge it. This ack tells the JetStream server that the message has been successfully processed. If a consumer doesn’t ack messages within a certain timeframe (defined by
AckWaitin the consumer configuration), JetStream can redeliver the message. - Delivery Policies: JetStream consumers have delivery policies that influence how messages are sent.
DeliverPolicy(e.g.,All,Last,Since) andReplayPolicy(Instant,Original) are important. More critically for slow consumers areMaxDeliverandMaxAckPending. MaxAckPending: This is the key setting. It limits the number of unacknowledged messages a consumer can have outstanding. If a consumer requests more messages thanMaxAckPendingallows, the server will pause sending more messages until some are acknowledged. This is the direct mechanism that prevents overwhelming the consumer.
- Acknowledgments (Acks): When a consumer receives a message, it must acknowledge it. This ack tells the JetStream server that the message has been successfully processed. If a consumer doesn’t ack messages within a certain timeframe (defined by
Let’s look at MaxAckPending in a JetStream consumer configuration:
// Example JetStream Consumer Configuration
streamName := "updates"
consumerName := "my-slow-consumer"
// Create a durable consumer
_, err := js.AddConsumer(streamName, &nats.ConsumerConfig{
Durable: consumerName,
AckPolicy: nats.AckExplicit, // Require explicit acknowledgments
AckWait: 10 * time.Second, // How long to wait for an ack before redelivering
MaxDeliver: 10, // Max times to deliver a message before giving up
MaxAckPending: 5, // The crucial setting: only 5 unacked messages allowed at once
})
if err != nil {
log.Fatalf("Error adding consumer: %v", err)
}
In this JetStream configuration, the MaxAckPending: 5 setting means that once the consumer has received 5 messages but hasn’t acknowledged them, the JetStream server will stop sending more messages to this consumer until at least one of those 5 messages is acknowledged. This is the explicit "penalty" or, more accurately, the flow control mechanism.
To handle and mitigate slow consumer issues:
-
Increase
MaxAckPending: If your consumer is genuinely slow but you expect it to eventually catch up and you don’t want it to stall completely, increaseMaxAckPending. This allows the consumer to buffer more messages.- Diagnosis: Observe the consumer’s processing rate versus the publisher’s rate. If the number of unacknowledged messages grows steadily, you’re hitting the limit.
- Fix: In the
nats.ConsumerConfig, setMaxAckPendingto a higher value, e.g.,100or1000, depending on your message size and processing capabilities. - Why it works: It increases the buffer size for unacknowledged messages, giving the slow consumer more breathing room before the server stops sending.
-
Optimize Consumer Processing: This is the most fundamental solution. Make your consumer faster.
- Diagnosis: Profile your consumer application. Identify bottlenecks in database calls, external API requests, or complex computations.
- Fix: Refactor slow code, use concurrent processing within the consumer (e.g., goroutines for independent message processing), optimize database queries, or add caching.
- Why it works: A faster consumer acknowledges messages more quickly, naturally staying below the
MaxAckPendinglimit and keeping up with the publisher.
-
Scale Consumers: If a single consumer instance cannot keep up, run multiple instances of the same consumer, all subscribing to the same JetStream consumer name (for durable consumers). JetStream will automatically distribute messages across these instances.
- Diagnosis: High latency, steadily increasing unacknowledged messages, and CPU/memory saturation on a single consumer instance.
- Fix: Deploy more instances of your consumer application. Ensure they all use the same JetStream consumer name and stream.
- Why it works: Distributes the workload, allowing parallel processing of messages.
-
Adjust
AckWait: If your processing is sometimes slow but usually fast,AckWaitmight be too short, causing redeliveries and unnecessary load. Conversely, ifAckWaitis too long, you might be waiting too long to detect a truly dead consumer.- Diagnosis: Frequent redeliveries of messages that are eventually processed successfully.
- Fix: Increase
AckWaitinnats.ConsumerConfigto a value comfortably longer than your typical processing time, e.g.,60 * time.Second. - Why it works: Gives the consumer more time to acknowledge messages before JetStream assumes it’s failed and attempts redelivery.
-
Use
MaxDeliverWisely: If a consumer consistently fails to process certain messages (e.g., due to bad data),MaxDeliverprevents infinite redelivery loops.- Diagnosis: Messages repeatedly being redelivered and eventually ending up in a dead-letter queue (if configured) or impacting overall consumer throughput.
- Fix: Set
MaxDeliverto a reasonable number (e.g.,3or5) innats.ConsumerConfig. Consider a dead-letter queue (DeliverPolicy: nats.DeliverDLX) to capture these problematic messages. - Why it works: Limits the number of times a single message will be attempted, preventing resource exhaustion on the consumer or server from persistently failing messages.
The most counterintuitive aspect of NATS flow control, particularly with JetStream, is that the "penalty" isn’t a punitive action by the server to slow down a misbehaving client. Instead, it’s a direct consequence of the server respecting the client’s stated capacity (MaxAckPending) and the underlying physics of network buffers and acknowledgment protocols. The server simply stops pushing data when it knows the client cannot accept it without dropping it or becoming overloaded, based on the configuration you’ve provided.
The next thing you’ll likely run into is handling message ordering guarantees when scaling consumers or dealing with the complexities of dead-letter queues.