MongoDB’s Change Streams let you tap into a real-time stream of data changes happening within your database.

Imagine you’re building a dashboard that needs to update instantly whenever a new order comes in, or a user profile is modified. Instead of constantly polling the database (which is inefficient and slow), Change Streams act like a persistent notification system. When data changes in a collection—an insert, update, or delete—MongoDB publishes an event describing that change. Your application can then subscribe to these events and react immediately.

Let’s see this in action. Here’s a simplified Node.js example using the official MongoDB driver to watch for changes in a orders collection:

const { MongoClient } = require('mongodb');

async function watchOrders() {
    const uri = "mongodb://localhost:27017";
    const client = new MongoClient(uri);

    try {
        await client.connect();
        console.log("Connected successfully to server");

        const database = client.db("mydatabase");
        const collection = database.collection("orders");

        // Create a change stream cursor
        const changeStream = collection.watch();

        console.log("Watching for changes in the 'orders' collection...");

        // Listen for change events
        changeStream.on('change', (change) => {
            console.log("Change detected:", change);
            // React to the change, e.g., update a UI, trigger another process
            if (change.operationType === 'insert') {
                console.log("New order inserted:", change.fullDocument);
            } else if (change.operationType === 'update') {
                console.log("Order updated:", change.documentKey._id);
            } else if (change.operationType === 'delete') {
                console.log("Order deleted:", change.documentKey._id);
            }
        });

        // Keep the process running to continue watching
        process.stdin.resume();

    } catch (err) {
        console.error("Error:", err);
    } finally {
        // In a real app, you'd manage connection closing more gracefully
        // For this example, we'll leave it open until the process exits.
    }
}

watchOrders();

To make this work, you need a replica set enabled for your MongoDB deployment. Change Streams rely on the oplog (operations log) of a replica set to capture history. A standalone MongoDB instance does not have an oplog and cannot support Change Streams.

The core mechanism involves a watch() operation on a collection or database. This returns a cursor that yields change events. Each event is a document describing the operation, including its type (insert, update, delete, replace, invalidate), the ID of the affected document, and sometimes the full document before or after the change, depending on the operation and configuration.

You can fine-tune what events you receive. For instance, you can specify a pipeline of aggregation operators to filter or transform the change events before they’re sent to your application. This is incredibly powerful for reducing the volume of data your application needs to process.

// Example of filtering for only 'insert' operations on orders over $100
const changeStreamFiltered = collection.watch(
    [
        { $match: { "operationType": "insert" } },
        { $match: { "fullDocument.amount": { $gt: 100 } } }
    ]
);

You can also watch an entire database, not just a single collection, by calling watch() on the Db object: client.db("mydatabase").watch(). This will emit changes from all collections within that database.

The ability to resume a change stream is crucial for building robust applications. If your application goes down or the network connection is interrupted, you don’t want to miss changes. Change Streams provide a resumeToken with each event. You can store this token and, upon reconnecting, use it to start watching from the exact point where you left off.

let lastResumeToken = null; // Store this token between sessions

// ... inside your change stream listener ...
changeStream.on('change', (change) => {
    console.log("Change detected:", change);
    lastResumeToken = change._id; // Update the token
    // ... react to change ...
});

// To resume:
const resumedChangeStream = collection.watch(
    [], // Optional aggregation pipeline
    { resumeAfter: lastResumeToken }
);

This resumeAfter option, when provided with a valid token, ensures that the stream picks up exactly where it left off, guaranteeing no data is lost during downtime. The token itself is a BSON document representing the logical order of operations within the oplog.

You can also directly specify startAtOperationTime to begin watching from a specific point in time, which is useful for initial synchronization or testing.

The maximum number of concurrent change streams you can have open per MongoDB deployment is controlled by the max_open_change_streams server parameter. Exceeding this limit will cause new change streams to fail with an error.

If you’re watching a sharded collection, the change stream cursor will collect events from the primary of each shard. The events are then merged and ordered by their timestamp. The resumeToken will reflect this merged operation timeline.

The actual mechanism relies on the replica set’s oplog. When you call watch(), MongoDB creates a special cursor that reads from this oplog. The cursor advances as new operations are appended to the oplog. The events emitted by the change stream are essentially filtered and formatted versions of the oplog entries relevant to your watched collection or database.

A common pitfall is forgetting to handle the invalidate event. This event signifies that the change stream is no longer valid, often due to a schema change, a topology change in the replica set, or the collection being dropped. Your application must be prepared to handle this, typically by closing the existing stream and opening a new one.

The next challenge you’ll likely face is managing the state of your application based on these incoming changes, especially in distributed systems.

Want structured learning?

Take the full Mongodb course →