NATS JetStream is not an upgrade to NATS Core; it’s a fundamentally different offering that adds persistence and guarantees to NATS’s existing high-performance message delivery.
Let’s see what that looks like in practice. Imagine you have a simple NATS publisher and subscriber.
Publisher (pub.go):
package main
import (
"log"
"time"
"github.com/nats-io/nats.go"
)
func main() {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
for i := 0; i < 10; i++ {
msg := []byte("Hello, NATS! #" + string(i))
if err := nc.Publish("greetings", msg); err != nil {
log.Printf("Error publishing: %v", err)
} else {
log.Printf("Published: %s", msg)
}
time.Sleep(500 * time.Millisecond)
}
}
Subscriber (sub.go):
package main
import (
"log"
"time"
"github.com/nats-io/nats.go"
)
func main() {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
sub, err := nc.Subscribe("greetings", func(msg *nats.Msg) {
log.Printf("Received on [%s]: %s", msg.Subject, string(msg.Data))
})
if err != nil {
log.Fatal(err)
}
log.Printf("Listening on subject greetings...")
// Keep the subscriber running
time.Sleep(10 * time.Second)
sub.Unsubscribe()
}
If you run pub.go and then sub.go, you’ll see the messages appear almost instantaneously. This is NATS Core: fire and forget, low latency, high throughput. But what happens if the subscriber isn’t running when the publisher sends a message? It’s gone. Lost forever.
This is where JetStream enters the picture. JetStream provides reliable message delivery by storing messages on disk (or in memory for ephemeral streams) before they are delivered to consumers.
To use JetStream, you first need to create a Stream. A stream is like a durable log where messages are appended. Then, you create a Consumer that reads from that stream.
Let’s adapt our example to use JetStream.
JetStream Publisher (js_pub.go):
package main
import (
"log"
"time"
"github.com/nats-io/nats.go"
)
func main() {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Create a JetStream context
js, err := nc.JetStream()
if err != nil {
log.Fatal(err)
}
// Define stream configuration
streamName := "GREETINGS"
// Check if stream exists, create if not
if _, err := js.StreamInfo(streamName); err != nil {
log.Printf("Stream %s not found, creating...", streamName)
_, err = js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{"greetings"},
})
if err != nil {
log.Fatalf("Error creating stream: %v", err)
}
log.Printf("Stream %s created.", streamName)
}
for i := 0; i < 10; i++ {
msg := []byte("Hello, JetStream! #" + string(i))
// Publish to JetStream, which appends to the stream
if _, err := js.Publish("greetings", msg); err != nil {
log.Printf("Error publishing to JetStream: %v", err)
} else {
log.Printf("Published to JetStream: %s", msg)
}
time.Sleep(500 * time.Millisecond)
}
}
JetStream Subscriber (js_sub.go):
package main
import (
"log"
"time"
"github.com/nats-io/nats.go"
)
func main() {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
js, err := nc.JetStream()
if err != nil {
log.Fatal(err)
}
streamName := "GREETINGS"
consumerName := "MY_CONSUMER"
// Create or get the consumer
// This consumer will pull messages from the stream
// The durable consumer will resume from where it left off
// The ack_policy ensures messages are acknowledged after processing
_, err = js.AddConsumer(streamName, &nats.ConsumerConfig{
Name: consumerName,
Durable: consumerName, // Makes the consumer stateful
DeliverPolicy: nats.DeliverAll, // Start from the beginning of the stream
AckPolicy: nats.AckExplicit, // We must acknowledge messages
MaxDeliver: 10, // Max retries before considering message dead
SampleFrequency: "1s",
FilterSubject: "greetings",
DeliverSubject: "", // Defaults to a dynamically created subject
})
if err != nil {
log.Printf("Consumer %s already exists or error creating: %v", consumerName, err)
} else {
log.Printf("Consumer %s created for stream %s", consumerName, streamName)
}
// Subscribe to the consumer's deliver subject
sub, err := js.SubscribeSync("greetings", nats.BindStream(streamName), nats.BindConsumer(consumerName))
if err != nil {
log.Fatal(err)
}
log.Printf("Listening on consumer %s for stream %s...", consumerName, streamName)
for i := 0; i < 10; i++ {
msg, err := sub.NextMsg(10 * time.Second) // Wait for a message
if err != nil {
log.Printf("Error receiving message: %v", err)
continue
}
log.Printf("Received on [%s] (Seq: %d): %s", msg.Subject, msg.Sequence, string(msg.Data))
// Acknowledge the message
if err := msg.Ack(); err != nil {
log.Printf("Error acknowledging message: %v", err)
}
}
sub.Unsubscribe()
}
If you run js_pub.go and then js_sub.go, you’ll see the messages arrive. Now, if you stop js_sub.go and run js_pub.go again, then start js_sub.go, you’ll still receive all the messages. This is because JetStream persisted them. The Durable consumer configuration ensures that even if the subscriber restarts, it picks up where it left off.
The core difference lies in the guarantees. NATS Core offers at-most-once delivery (messages might be lost if consumers are down). JetStream provides at-least-once delivery (messages are guaranteed to be delivered, though potentially more than once, requiring idempotent processing) and exactly-once semantics under specific configurations.
JetStream introduces concepts like:
- Streams: Append-only logs of messages. They define the subjects messages are stored under and can have retention policies (e.g., keep last 1000 messages, keep messages for 24 hours).
- Consumers: Applications that read from streams. They can be durable (state is saved, resuming from the last acknowledged message) or ephemeral (state is lost on restart).
- Acknowledge Policy: How consumers confirm message processing.
AckExplicitmeans the consumer must manually acknowledge each message.AckAllmeans all messages up to a certain point are acknowledged. - Deliver Policy: When a consumer starts, what messages it should receive.
DeliverAllstarts from the beginning of the stream,DeliverLaststarts from the last message,DeliverNewstarts from messages published after the consumer was created.
The choice hinges on your application’s needs. For real-time, high-frequency, non-critical data where occasional loss is acceptable (e.g., telemetry, metrics), NATS Core is excellent. For critical data, event sourcing, or any scenario where message loss is unacceptable, JetStream is the solution.
JetStream’s ability to automatically manage message persistence and delivery state is a powerful abstraction, but it also means you’re dealing with disk I/O and internal state management, which adds a small overhead compared to NATS Core’s pure in-memory, network-bound operations. The trade-off is reliability for raw speed.
One key aspect of JetStream is how it handles message acknowledgments and redelivery. When a consumer receives a message with AckExplicit, it has a configurable amount of time to acknowledge it. If it fails to do so (due to a crash, network issue, or processing error), JetStream will eventually redeliver the message. This redelivery mechanism is what enables at-least-once semantics. The MaxDeliver setting on a consumer is crucial; if a message is redelivered more times than MaxDeliver without acknowledgment, JetStream will deem it undeliverable, potentially sending it to a Dead Letter Queue (DLQ) if configured.
The next logical step after mastering reliable messaging is to explore NATS’s request/reply patterns and how JetStream enhances them with request-based streams and acknowledgments.