使用反应性mongo模板,我尝试只为插入操作侦听6个集合的mongo更改流。下面是我开始监听更改流的代码:
更改流启动:
//Loop for 6 collections
reactiveMongoTemplate.changeStream(collectionName, changeStreamOptions, MongoChangeStreamEvent.class)
.doOnNext(changeStreamEvent -> {
System.out.println("In doOnNext: Received a new change stream event ");
})
.map(changeStreamEvent -> {
//save resume token
})
.onErrorResume(throwable -> {
System.out.println("In onErrorResume of change stream event :: " + throwable.getMessage());
return null;
})
.subscribe();在每个新条目上,我将简历令牌保存在我的集合中:
//Document
public class MongoChangeStreamEvent implements Serializable {
@Id
private String id;
private String resumeToken;
//other fields and getters and setters
}使用更改流选项进行启动。如果集合中存在简历令牌,请使用它。否则现在就继续。
ChangeStreamOptions changeStreamOptions;
final BsonDocument resumeToken = //getLatestResumeToken, first entry after sorting
//MongoChangeStreamEvent collection in descending order based on
//resumeToken
if (!resumeToken.isEmpty()) {
changeStreamOptions = ChangeStreamOptions.builder()
.filter(newAggregation(match(where(OPERATION_TYPE).is(INSERT_OPERATION))))
.resumeAfter(resumeToken)
.build();
} else {
changeStreamOptions = ChangeStreamOptions.builder()
.filter(newAggregation(match(where(OPERATION_TYPE).is(INSERT_OPERATION))))
.resumeAt(Instant.now())
.build();
}引发的错误:
Suppressed: com.mongodb.MongoQueryException: Query failed with error code 280 and error message
'cannot resume stream; the resume token was not found. {_data:
"825EAFCA12000000162B022C0100296E5A100440284C0343E64ADEB43522FC0552CC1446645F696400645EAFCA12B51E93000716B9300004"}'到目前为止,我对这个功能的工作有不同的经验。在没有现有简历标记的情况下启动的应用程序总是按预期工作。当我用现有的简历标记重新启动应用程序时,其他发现如下:
我理解变更流依赖于文档中引用的oplogs历史。对我来说,更令人惊讶的是,被错误删除的简历标记与我现有的任何简历标记都不匹配,而且也没有出现在简历中。
我已经验证了提交给反应性mongo模板的简历标记总是正确的。
发布于 2020-05-04 18:22:19
final BsonDocument resumeToken = //getLatestResumeToken, first entry after sorting
//MongoChangeStreamEvent collection in descending order based on
//resumeToken这个逻辑所依据的是什么文档?
在当前驱动程序中,驱动程序应该提供将更改流的简历令牌传递给应用程序的功能。这个简历标记可以来自几个来源,而且它也可能具有不同的格式。在任何情况下,应用程序都不应该对简历令牌或类似的内容进行排序。
例如,您可以看到何时检索到每个简历令牌并对该时间戳进行排序,但对简历令牌唯一合适的操作是将其返回给驱动程序,以启动新的更改流。
https://stackoverflow.com/questions/61598140
复制相似问题