NATS doesn’t actually batch your messages for you when you publish them asynchronously; it relies on your application to do it.
Let’s see this in action. Imagine you have a service that needs to send a lot of small status updates to a NATS subject like service.status.
package main
import (
"fmt"
"log"
"time"
"github.com/nats-io/nats.go"
)
func main() {
// Connect to NATS
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatalf("Failed to connect to NATS: %v", err)
}
defer nc.Close()
log.Println("Connected to NATS")
// Simulate sending many small messages
startTime := time.Now()
for i := 0; i < 100000; i++ {
message := fmt.Sprintf("status update #%d", i)
err := nc.Publish("service.status", []byte(message))
if err != nil {
log.Printf("Error publishing message %d: %v", i, err)
// In a real app, you'd have more robust error handling
}
}
// Without flushing, messages might not be sent immediately
err = nc.Flush()
if err != nil {
log.Fatalf("Flush failed: %v", err)
}
duration := time.Since(startTime)
log.Printf("Sent 100,000 individual messages in %v", duration)
}
Running this code will likely show a rather disappointing throughput. Why? Because each nc.Publish call initiates a separate network write operation. Even though NATS is fast, the overhead of setting up and tearing down these individual operations, plus the TCP/IP stack’s own overhead for each packet, adds up quickly.
The core problem NATS async publishing solves is decoupling your application’s publishing logic from the network I/O. When you call nc.Publish, the message is typically buffered in the NATS client’s memory. It’s not sent to the NATS server immediately. The client will send these buffered messages when it deems it efficient, or when explicitly told to.
The NATS Go client, by default, has a buffer size and a timeout. If the buffer fills up, or if a certain amount of time passes without the buffer being emptied, the client will send the buffered messages. However, for very high-volume, low-latency scenarios with many small messages, relying solely on these defaults can leave performance on the table.
The real lever you have is explicitly managing the flushing of the client’s send buffer. The nc.Flush() method forces the client to send all currently buffered messages to the NATS server. When combined with judicious use of nc.PublishMsg (which can be slightly more efficient as it avoids some string-to-byte-slice conversions if you’re already working with nats.Msg objects), you can achieve much higher throughput.
Here’s how you’d batch manually for better throughput:
package main
import (
"fmt"
"log"
"time"
"github.com/nats-io/nats.go"
)
const (
batchSize = 1000 // Number of messages to buffer before flushing
maxOutstanding = 10000 // Max messages to have outstanding before pausing
)
func main() {
// Connect to NATS
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatalf("Failed to connect to NATS: %v", err)
}
defer nc.Close()
log.Println("Connected to NATS")
// Simulate sending many small messages in batches
startTime := time.Now()
messageCount := 0
outstandingCount := 0
for i := 0; i < 100000; i++ {
message := fmt.Sprintf("status update #%d", i)
err := nc.Publish("service.status", []byte(message))
if err != nil {
log.Printf("Error publishing message %d: %v", i, err)
// In a real app, you'd have more robust error handling
}
messageCount++
outstandingCount++
// Flush when batch size is reached or when near max outstanding
if outstandingCount >= batchSize || outstandingCount >= maxOutstanding {
err = nc.Flush()
if err != nil {
log.Fatalf("Flush failed: %v", err)
}
outstandingCount = 0 // Reset count after flush
}
}
// Ensure any remaining messages are sent
if outstandingCount > 0 {
err = nc.Flush()
if err != nil {
log.Fatalf("Final flush failed: %v", err)
}
}
duration := time.Since(startTime)
log.Printf("Sent %d batched messages in %v", messageCount, duration)
}
In this batched version, we introduce two key concepts: batchSize and maxOutstanding.
batchSize(e.g., 1000) means we’ll try to accumulate 1000 messages in the client’s buffer before forcing aFlush. This amortizes the overhead ofFlushand network write operations across many messages.maxOutstanding(e.g., 10000) is a safety valve. The NATS client keeps track of how many messages are currently in flight (sent but not yet acknowledged by the server, if using acknowledgements). If this number gets too high, the client might start blocking beforePublishreturns. Flushing periodically, even if thebatchSizeisn’t met, prevents the client from getting overloaded and potentially blocking your publishing loop.
By controlling when nc.Flush() is called, you’re essentially controlling how NATS bundles your individual Publish calls into larger network writes. This is the primary mechanism for optimizing throughput for asynchronous publishing in NATS.
When you call nc.Flush(), the NATS client doesn’t just send the data; it also waits for confirmation from the NATS server that the buffer has been processed. This waiting is a crucial part of ensuring reliability but can also become a bottleneck if you flush too frequently. The nc.Drain() method is similar but also waits for all subscriptions to acknowledge messages before closing.
The next thing you’ll likely encounter when pushing throughput limits is the NATS server’s own processing capacity and network saturation.