我创建了一个微服务,它监听一个集合(即e- mycollection)的mongodb变更流通知,并更新另一个集合(即e- data集合)中的变更流数据。
为了提高可伸缩性,我创建了我的应用程序的10个实例,它们将读取相同的变更流通知。
前-
假设一次有100条记录在"mycollection“集合中被更新。因此,change stream将收到100条记录的通知。
第一个实例应该从变更流中读取1- 20条记录,并更新其他集合中的记录(即E- other集合)
第二个实例应该更新变更流中的21 - 40条记录,并更新集合"yourcollection“中的记录。
3nd实例应该更新来自changestream的41 - 60条记录,并更新集合“that collection”中的记录。
所以..。在..。
没有两个实例会从变更流中获得相同的重复数据。
有没有办法实现上述要求。
代码快照
下面是我的代码,每个进程都会获得重复的数据。有什么建议来处理这种情况吗?
公共字符串startProcess() {
MongoClient mongoClient = new MongoClient(new MongoClientURI("mongodb://localhost:27017,localhost:27018,localhost:27019/user?replSet=simpliReplica"));
MongoDatabase database = mongoClient.getDatabase("mypoc");
MongoCollection<Document> collection = database.getCollection("user");
try {
Block<ChangeStreamDocument<Document>> printBlock = new Block<ChangeStreamDocument<Document>>() {
public void apply(final ChangeStreamDocument<Document> changeStreamDocument) {
System.out.println(" MyService:::"+changeStreamDocument.getFullDocument());
}
};
// collection.watch - Establishes a Change Stream on a collection.This will identify any changes happening to the collection.
collection.watch(asList(Aggregates.match(Filters.in("operationType", asList("insert", "update", "replace", "delete")))))
.fullDocument(FullDocument.UPDATE_LOOKUP).forEach(printBlock);
} catch (IOException e) {
e.printStackTrace();
}
}
}谢谢Dillip
发布于 2018-10-04 23:27:27
您可以尝试使用以下命令定义不同的数据库/集合:
MongoDatabase database = mongoClient.getDatabase("mypoc");
MongoCollection<Document> collection = database.getCollection("user");然后使用模来迭代您的数据,并将它们放入正确的集合中。例如:
https://stackoverflow.com/questions/51376080
复制相似问题