首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用无效的简历令牌恢复反应性mongo更改流。

使用无效的简历令牌恢复反应性mongo更改流。
EN

Stack Overflow用户
提问于 2020-05-04 17:29:51
回答 1查看 1.9K关注 0票数 1

使用反应性mongo模板,我尝试只为插入操作侦听6个集合的mongo更改流。下面是我开始监听更改流的代码:

更改流启动:

代码语言:javascript
复制
//Loop for 6 collections
reactiveMongoTemplate.changeStream(collectionName, changeStreamOptions, MongoChangeStreamEvent.class)
          .doOnNext(changeStreamEvent -> {          
            System.out.println("In doOnNext: Received a new change stream event ");         
          })
          .map(changeStreamEvent -> {    
            //save resume token
          })
          .onErrorResume(throwable -> {
            System.out.println("In onErrorResume of change stream event :: " + throwable.getMessage());
            return null;
          })
          .subscribe();

在每个新条目上,我将简历令牌保存在我的集合中:

代码语言:javascript
复制
//Document
public class MongoChangeStreamEvent implements Serializable {
  @Id
  private String id;
  private String resumeToken;
  //other fields and getters and setters
}

使用更改流选项进行启动。如果集合中存在简历令牌,请使用它。否则现在就继续。

代码语言:javascript
复制
ChangeStreamOptions changeStreamOptions;
    final BsonDocument resumeToken = //getLatestResumeToken, first entry after sorting 
                                     //MongoChangeStreamEvent collection in descending order based on 
                                     //resumeToken

    if (!resumeToken.isEmpty()) {
      changeStreamOptions = ChangeStreamOptions.builder()
          .filter(newAggregation(match(where(OPERATION_TYPE).is(INSERT_OPERATION))))
          .resumeAfter(resumeToken)
          .build();
    } else {
      changeStreamOptions = ChangeStreamOptions.builder()
          .filter(newAggregation(match(where(OPERATION_TYPE).is(INSERT_OPERATION))))        
          .resumeAt(Instant.now())
          .build();
    }

引发的错误:

代码语言:javascript
复制
Suppressed: com.mongodb.MongoQueryException: Query failed with error code 280 and error message 
'cannot resume stream; the resume token was not found. {_data: 
"825EAFCA12000000162B022C0100296E5A100440284C0343E64ADEB43522FC0552CC1446645F696400645EAFCA12B51E93000716B9300004"}'

到目前为止,我对这个功能的工作有不同的经验。在没有现有简历标记的情况下启动的应用程序总是按预期工作。当我用现有的简历标记重新启动应用程序时,其他发现如下:

  1. 有时它对所有的6人都是完美的。
  2. 重新启动后,它启动了几个,其余的失败了。
  3. 重新启动后,启动就没有抛出任何错误。但是,在被监视的集合中插入文档时,很少/全部出错。

我理解变更流依赖于文档中引用的oplogs历史。对我来说,更令人惊讶的是,被错误删除的简历标记与我现有的任何简历标记都不匹配,而且也没有出现在简历中。

我已经验证了提交给反应性mongo模板的简历标记总是正确的。

  1. 如果我遗漏了什么,请告诉我。
  2. 此外,我还想知道如何处理运行中的许多更改流中的一个/几个失败。
EN

回答 1

Stack Overflow用户

发布于 2020-05-04 18:22:19

代码语言:javascript
复制
final BsonDocument resumeToken = //getLatestResumeToken, first entry after sorting 
                                 //MongoChangeStreamEvent collection in descending order based on 
                                 //resumeToken

这个逻辑所依据的是什么文档?

在当前驱动程序中,驱动程序应该提供将更改流的简历令牌传递给应用程序的功能。这个简历标记可以来自几个来源,而且它也可能具有不同的格式。在任何情况下,应用程序都不应该对简历令牌或类似的内容进行排序。

例如,您可以看到何时检索到每个简历令牌并对该时间戳进行排序,但对简历令牌唯一合适的操作是将其返回给驱动程序,以启动新的更改流。

Ruby驱动程序示例:https://docs.mongodb.com/ruby-driver/master/tutorials/ruby-driver-change-streams/#resuming-a-change-stream

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61598140

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档