NATS connection draining is how a server ensures all in-flight messages are processed before a client disconnects, preventing data loss.
Let’s see it in action. Imagine a publisher sending messages to a NATS subject and a subscriber processing them.
// Publisher example
package main
import (
"log"
"time"
"github.com/nats-io/nats.go"
)
func main() {
nc, err := nats.Connect("nats://localhost:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
for i := 0; i < 100; i++ {
msg := []byte("Hello #" + string(i))
nc.Publish("updates", msg)
log.Printf("Published: %s", msg)
time.Sleep(100 * time.Millisecond)
}
log.Println("Finished publishing. Initiating drain...")
nc.Drain() // This is the key part
log.Println("Drain complete. Publisher exiting.")
}
// Subscriber example
package main
import (
"log"
"time"
"github.com/nats-io/nats.go"
)
func main() {
nc, err := nats.Connect("nats://localhost:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()
sub, err := nc.SubscribeSync("updates")
if err != nil {
log.Fatal(err)
}
log.Println("Subscribed to updates. Waiting for messages...")
for {
msg, err := sub.NextMsg(10 * time.Second) // Use a timeout to allow eventual exit
if err != nil {
if err == nats.ErrTimeout {
log.Println("No more messages, exiting.")
break
}
log.Fatal(err)
}
log.Printf("Received: %s", msg.Data)
// Simulate some processing time
time.Sleep(500 * time.Millisecond)
}
log.Println("Subscriber exiting.")
}
When the publisher calls nc.Drain(), it signals to the NATS server that it will no longer be sending messages. The server then stops routing new messages to this specific publisher connection. However, any messages that have already been published and are in transit to subscribers will still be delivered. The Drain() operation on the client side will block until all outstanding messages sent by this client have been acknowledged by the server (meaning they’ve been sent out to subscribers), and all messages received by this client have been processed.
The core problem NATS connection draining solves is preventing message loss during application restarts or deployments. Without draining, if a publisher or subscriber disconnects abruptly, any messages it was in the process of sending or receiving could be lost. NATS draining provides a mechanism for clients to signal their intent to shut down gracefully, allowing for the completion of in-flight operations.
Internally, when a client calls Drain(), it enters a special state. For publishers, this means the client stops sending new messages but continues to acknowledge any outstanding internal buffers that need to be flushed to the server. For subscribers, it means the client will continue to process incoming messages until its internal queue is empty and its subscription is effectively closed by the server. The NATS server, upon detecting a drain request, will hold off on closing the connection until the client’s draining process is complete. This handshake ensures that all messages have had a chance to be delivered and processed.
You control the draining process through the Drain() method on the nats.Conn object. This method is synchronous and will block until the drain is complete. You can also use nc.DrainTimeout(duration) if you want to set a maximum time for the drain operation to prevent indefinite blocking in certain scenarios.
The nc.Drain() call on the publisher side doesn’t actually wait for subscribers to process the messages; it waits for the NATS server to deliver them to the subscriber queues. The subscriber’s NextMsg call with a timeout is what determines how long it will wait to process messages before deciding there are no more. If a subscriber is very slow, it might not finish processing all messages before the publisher’s drain completes, but the messages will have been delivered to the subscriber’s buffer.
The next concept you’ll encounter is how to handle persistent message queues and guaranteed delivery using NATS JetStream.