首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Debezium Mongo无法读取oplog,即使使用changestream

Debezium Mongo无法读取oplog,即使使用changestream
EN

Stack Overflow用户
提问于 2022-03-15 10:15:33
回答 1查看 548关注 0票数 1

我需要实现CDC模式,但无法使其工作:我的debezium工作人员已经启动并运行,但我的连接器仍然失败,尽管我的努力。

我在我的mongo集群上测试了一个简单的“手表”,它起作用了:

代码语言:javascript
复制
watchCursor = db.mydb.watch()
while (!watchCursor.isExhausted()){
   if (watchCursor.hasNext()){
      print(watchCursor.next());
   }
}

因此,我可以看出我的用户在集群上有观看变更的权限。

在运行任务时,我仍然有一个错误:

代码语言:javascript
复制
"tasks": [
    {
        "id": 0,
        "state": "FAILED",
        "worker_id": "10.114.129.247:8083",
        "trace": "org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.\n\tat io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:115)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: io.debezium.DebeziumException: org.apache.kafka.connect.errors.ConnectException: Error while attempting to get oplog position\n\tat io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:85)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:153)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:135)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:108)\n\t... 5 more\nCaused by: org.apache.kafka.connect.errors.ConnectException: Error while attempting to get oplog position\n\tat io.debezium.connector.mongodb.MongoDbSnapshotChangeEventSource.lambda$establishConnectionToPrimary$3(MongoDbSnapshotChangeEventSource.java:234)\n\tat io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.execute(ConnectionContext.java:292)\n\tat io.debezium.connector.mongodb.MongoDbSnapshotChangeEventSource.lambda$determineSnapshotOffsets$6(MongoDbSnapshotChangeEventSource.java:295)\n\tat java.base/java.util.HashMap$Values.forEach(HashMap.java:976)\n\tat io.debezium.connector.mongodb.ReplicaSets.onEachReplicaSet(ReplicaSets.java:115)\n\tat io.debezium.connector.mongodb.MongoDbSnapshotChangeEventSource.determineSnapshotOffsets(MongoDbSnapshotChangeEventSource.java:290)\n\tat io.debezium.connector.mongodb.MongoDbSnapshotChangeEventSource.doExecute(MongoDbSnapshotChangeEventSource.java:99)\n\tat io.debezium.connector.mongodb.MongoDbSnapshotChangeEventSource.doExecute(MongoDbSnapshotChangeEventSource.java:52)\n\tat io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76)\n\t... 8 more\nCaused by: com.mongodb.MongoQueryException: Query failed with error code 13 and error message 'not authorized on local to execute command { find: \"oplog.rs\", filter: {}, sort: { $natural: -1 }, limit: 1, singleBatch: true, $db: \"local\", lsid: { id: UUID(\"de332d4a-32ec-424f-ae09-b32376444d11\") }, $readPreference: { mode: \"primaryPreferred\" } }' on server rc1a-REDACTED.mdb.yandexcloud.net:27018\n\tat com.mongodb.internal.operation.FindOperation$1.call(FindOperation.java:663)\n\tat com.mongodb.internal.operation.FindOperation$1.call(FindOperation.java:653)\n\tat com.mongodb.internal.operation.OperationHelper.withReadConnectionSource(OperationHelper.java:583)\n\tat com.mongodb.internal.operation.FindOperation.execute(FindOperation.java:653)\n\tat com.mongodb.internal.operation.FindOperation.execute(FindOperation.java:81)\n\tat com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:184)\n\tat com.mongodb.client.internal.FindIterableImpl.first(FindIterableImpl.java:200)\n\tat io.debezium.connector.mongodb.MongoDbSnapshotChangeEventSource.lambda$determineSnapshotOffsets$5(MongoDbSnapshotChangeEventSource.java:297)\n\tat io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.execute(ConnectionContext.java:288)\n\t... 15 more\n"
    }
]

简单地说,我在oplog.rs上没有权利。尽管我不是用oplog,而是用"changestream“。

这是我的配置

代码语言:javascript
复制
{
"name": "account3-connector",
"config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "errors.log.include.messages": "true",
    "transforms.unwrap.delete.handling.mode": "rewrite",
    "mongodb.password": "REDACTED",
    "transforms": "unwrap,idToKey,extractIdKey",
    "capture.mode": "change_streams_update_full",
    "collection.include.list": "account.account",
    "mongodb.ssl.enabled": "false",
    "transforms.idToKey.fields": "id",
    "transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "transforms.extractIdKey.field": "id",
    "database.include.list": "account",
    "errors.log.enable": "true",
    "mongodb.hosts": "rc1a-REDACTED.mdb.yandexcloud.net:27018,rc1b-REDACTED.mdb.yandexcloud.net:27018,rc1c-REDACTED.mdb.yandexcloud.net:27018",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "transforms.idToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "mongodb.user": "debezium",
    "mongodb.name": "loyalty.raw.last",
    "key.converter.schemas.enable": "false",
    "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "name": "account3-connector",
    "errors.tolerance": "all",
    "transforms.extractIdKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key"
},
"tasks": [],
"type": "source"
}

你们有什么想法吗?有什么不对的?

我在汇合基图6.2.0上使用Debezium 1.8.1

代码语言:javascript
复制
FROM confluentinc/cp-kafka-connect-base:6.2.0
EN

回答 1

Stack Overflow用户

发布于 2022-10-17 15:14:04

我也遇到过同样的问题。我没有修复程序,但是在初始化过程中,即使在使用更改流时,连接器也会尝试查询oplog。

这是完整的堆栈跟踪:

代码语言:javascript
复制
com.mongodb.MongoQueryException: Query failed with error code 13 with name 'Unauthorized' and error message 'not authorized on local to execute command { find: \"oplog.rs\", filter: {}, sort: { $natural: -1 }, limit: 1, singleBatch: true, $db: \"local\", lsid: { id: UUID(\"51399eae-bb7a-4706-b35c-048e878d23a8\") }, $readPreference: { mode: \"primaryPreferred\" } }' on server pl-0-westeurope-azure.pi8eh.mongodb.net:1025
    at com.mongodb.internal.operation.FindOperation.lambda$execute$1(FindOperation.java:699)
    at com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$2(OperationHelper.java:566)
    at com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:591)
    at com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$3(OperationHelper.java:565)
    at com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:591)
    at com.mongodb.internal.operation.OperationHelper.withSourceAndConnection(OperationHelper.java:564)
    at com.mongodb.internal.operation.FindOperation.lambda$execute$2(FindOperation.java:690)
    at com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:65)
    at com.mongodb.internal.operation.FindOperation.execute(FindOperation.java:722)
    at com.mongodb.internal.operation.FindOperation.execute(FindOperation.java:86)
    at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:191)
    at com.mongodb.client.internal.FindIterableImpl.first(FindIterableImpl.java:213)
    at io.debezium.connector.mongodb.MongoUtil.getOplogEntry(MongoUtil.java:236)
    at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$initializeOffsets$4(MongoDbStreamingChangeEventSource.java:306)
    at io.debezium.connector.mongodb.ConnectionContext$MongoPrimary.execute(ConnectionContext.java:317)
    at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.lambda$initializeOffsets$5(MongoDbStreamingChangeEventSource.java:305)
    at java.base/java.util.HashMap$Values.forEach(HashMap.java:977)
    at io.debezium.connector.mongodb.ReplicaSets.onEachReplicaSet(ReplicaSets.java:120)
    at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.initializeOffsets(MongoDbStreamingChangeEventSource.java:300)
    at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.execute(MongoDbStreamingChangeEventSource.java:89)
    at io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource.execute(MongoDbStreamingChangeEventSource.java:51)
    at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:174)
    at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:141)
    at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

调用MongoUtil.getOplogEntry方法,从“本地”数据库查询"oplog.rs“集合。我的猜测是用户没有执行此查询的权限。

我不知道需要哪个权限,也不知道是否可以以某种方式跳过这个步骤。

从文档来看,添加读取oplog的权限似乎应该修复这个问题,但我还没有证实这一点。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71480455

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档