Advanced MongoDB

Real-Time Data with MongoDB Change Streams

Watch for real-time changes in collections and databases using change streams and resume tokens.

8 min read Tutorial

What are Change Streams?

Change streams allow applications to react to real-time data changes in MongoDB without polling. Built on top of the oplog (operations log), change streams provide a reliable, ordered stream of change events for inserts, updates, deletes, and replacements. They require a replica set or sharded cluster.

Watching a Collection

Open a change stream on a collection using the watch() method:

// Watch all changes on the "orders" collection
const changeStream = db.orders.watch()

// Iterate over change events
while (changeStream.hasNext()) {
  const change = changeStream.next()
  printjson(change)
}

// Example change event:
// {
//   _id: { _data: "..." },          // resume token
//   operationType: "insert",
//   fullDocument: { _id: ..., ... },
//   ns: { db: "mydb", coll: "orders" },
//   clusterTime: Timestamp(...)
// }

Filtering Change Events

Pass an aggregation pipeline to watch() to filter which events you receive. This is processed server-side for efficiency:

// Only watch for insert and update operations
const changeStream = db.orders.watch([
  {
    $match: {
      operationType: { $in: ["insert", "update"] }
    }
  }
])

// Watch for updates to high-value orders only
const changeStream = db.orders.watch([
  {
    $match: {
      operationType: "update",
      "fullDocument.total": { $gte: 1000 }
    }
  }
])

Full Document on Update

By default, update events only include the changed fields. Use fullDocument to get the complete document after the update:

// Include the full document in update events
const changeStream = db.orders.watch([], {
  fullDocument: "updateLookup"
})

// Now update events include:
// {
//   operationType: "update",
//   updateDescription: {
//     updatedFields: { status: "shipped" },
//     removedFields: []
//   },
//   fullDocument: { _id: ..., status: "shipped", items: [...], ... }
// }

// MongoDB 6.0+ also supports fullDocumentBeforeChange
const changeStream = db.orders.watch([], {
  fullDocument: "updateLookup",
  fullDocumentBeforeChange: "whenAvailable"
})

Resume Tokens

Each change event includes a resume token. If your application disconnects, you can resume the stream from where you left off:

// Save the resume token from the last processed event
let lastResumeToken = null

const changeStream = db.orders.watch()
while (changeStream.hasNext()) {
  const change = changeStream.next()
  processChange(change)
  lastResumeToken = change._id  // save the resume token
}

// Later, resume from where we left off
const resumedStream = db.orders.watch([], {
  resumeAfter: lastResumeToken
})

// Or use startAfter to skip the event at the token
const resumedStream2 = db.orders.watch([], {
  startAfter: lastResumeToken
})

Database and Cluster-Level Streams

You can watch an entire database or the whole cluster for changes across all collections:

// Watch all collections in a database
const dbStream = db.watch()

// Watch the entire cluster (all databases)
const clusterStream = db.getMongo().watch()

// Filter by collection name in a database-level stream
const dbStream = db.watch([
  {
    $match: {
      "ns.coll": { $in: ["orders", "payments"] }
    }
  }
])

Common Use Cases

  • Real-time notifications: Push updates to users when their data changes.
  • Cache invalidation: Refresh cached data when the source document is modified.
  • Data synchronization: Keep secondary systems in sync with MongoDB.
  • Audit logging: Record every change to critical collections.
  • Event-driven architecture: Trigger downstream processing on data changes.

Key Takeaways

  • Change streams provide real-time, ordered notifications of data changes.
  • Use aggregation pipelines in watch() to filter events server-side.
  • Resume tokens let you restart streams without missing events.
  • Streams work at collection, database, or cluster level and require a replica set.

Try this query in UnifySQL

Write, optimize, and collaborate on MongoDB queries with AI assistance.

Start Free