下面是我用来监视用户集合更新的示例代码。在redis中存储简历令牌。每当changestream停止时,令牌就被存储起来,当流再次打开时,它就从那里开始。
const watchMatch = [
{ $match: { 'fullDocument.watchIndex': {$gte: 0} } },
{ $addFields: { watchSource: 'watcher1' } }
];
const collection = db.collection('users');
const changeStream = collection.watch(pipeline);
changeStream.on('change', next => {
// process next document
// Store/Update current resume token
const resumeToken = "check if token object is available in redis";
if (resumeToken) {
changeStream.close();
// Resume watch
newChangeStream = collection.watch({ resumeAfter: resumeToken });
newChangeStream.on('change', next => {
// process next document
});
}
});但是,当流恢复时,它不会在匹配条件下过滤,我已经在主流中写入了。因此,基本上,我的问题是,如果可以的话,如何在changeStream上应用resumeAfter选项。
或者我可以得到resumeAfter?
的完整文档
发布于 2020-03-31 17:58:47
当您与resumeAfter选项一起恢复更改流时,可以传递管道。有点像
newChangeStream = collection.watch(watchMatch,{ resumeAfter: resumeToken});https://stackoverflow.com/questions/60734856
复制相似问题