What are Change Streams?
Change streams allow applications to react to real-time data changes in MongoDB without polling. Built on top of the oplog (operations log), change streams provide a reliable, ordered stream of change events for inserts, updates, deletes, and replacements. They require a replica set or sharded cluster.
Watching a Collection
Open a change stream on a collection using the watch() method:
// Watch all changes on the "orders" collection
const changeStream = db.orders.watch()
// Iterate over change events
while (changeStream.hasNext()) {
const change = changeStream.next()
printjson(change)
}
// Example change event:
// {
// _id: { _data: "..." }, // resume token
// operationType: "insert",
// fullDocument: { _id: ..., ... },
// ns: { db: "mydb", coll: "orders" },
// clusterTime: Timestamp(...)
// }Filtering Change Events
Pass an aggregation pipeline to watch() to filter which events you receive. This is processed server-side for efficiency:
// Only watch for insert and update operations
const changeStream = db.orders.watch([
{
$match: {
operationType: { $in: ["insert", "update"] }
}
}
])
// Watch for updates to high-value orders only
const changeStream = db.orders.watch([
{
$match: {
operationType: "update",
"fullDocument.total": { $gte: 1000 }
}
}
])Full Document on Update
By default, update events only include the changed fields. Use fullDocument to get the complete document after the update:
// Include the full document in update events
const changeStream = db.orders.watch([], {
fullDocument: "updateLookup"
})
// Now update events include:
// {
// operationType: "update",
// updateDescription: {
// updatedFields: { status: "shipped" },
// removedFields: []
// },
// fullDocument: { _id: ..., status: "shipped", items: [...], ... }
// }
// MongoDB 6.0+ also supports fullDocumentBeforeChange
const changeStream = db.orders.watch([], {
fullDocument: "updateLookup",
fullDocumentBeforeChange: "whenAvailable"
})Resume Tokens
Each change event includes a resume token. If your application disconnects, you can resume the stream from where you left off:
// Save the resume token from the last processed event
let lastResumeToken = null
const changeStream = db.orders.watch()
while (changeStream.hasNext()) {
const change = changeStream.next()
processChange(change)
lastResumeToken = change._id // save the resume token
}
// Later, resume from where we left off
const resumedStream = db.orders.watch([], {
resumeAfter: lastResumeToken
})
// Or use startAfter to skip the event at the token
const resumedStream2 = db.orders.watch([], {
startAfter: lastResumeToken
})Database and Cluster-Level Streams
You can watch an entire database or the whole cluster for changes across all collections:
// Watch all collections in a database
const dbStream = db.watch()
// Watch the entire cluster (all databases)
const clusterStream = db.getMongo().watch()
// Filter by collection name in a database-level stream
const dbStream = db.watch([
{
$match: {
"ns.coll": { $in: ["orders", "payments"] }
}
}
])Common Use Cases
- Real-time notifications: Push updates to users when their data changes.
- Cache invalidation: Refresh cached data when the source document is modified.
- Data synchronization: Keep secondary systems in sync with MongoDB.
- Audit logging: Record every change to critical collections.
- Event-driven architecture: Trigger downstream processing on data changes.
Key Takeaways
- Change streams provide real-time, ordered notifications of data changes.
- Use aggregation pipelines in
watch()to filter events server-side. - Resume tokens let you restart streams without missing events.
- Streams work at collection, database, or cluster level and require a replica set.
Try this query in UnifySQL
Write, optimize, and collaborate on MongoDB queries with AI assistance.
Start Free