我的应用程序中的Kafka debezium-postgres连接器抛出了以下错误:
org.apache.kafka.connect.errors.ConnectException: Unable to obtain valid replication slot. Make sure there are no long-running transactions running in parallel as they may hinder the allocation of the replication slot when starting this connector
at io.debezium.connector.postgresql.connection.PostgresConnection.readReplicationSlotInfo(PostgresConnection.java:226)
at io.debezium.connector.postgresql.connection.PostgresConnection.getReplicationSlotState(PostgresConnection.java:150)
at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:98)
at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:49)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)应用程序使用PostgreSQLVersion9.6.11,max_replication_slots的值为10。我可以看到数据库中的活动逻辑replication_slot为confirmed_flush_lsn = null,restart_lsn = 3/93043310,catalog_xmin = 202656,active = t,datoid = 16407,slot_type = logical,active_pid = 32183,plugin = wal2json,slot_name = slot1,数据库= db1 (我用虚拟值替换了槽名和db名称)。
根据我的理解,因为这里的逻辑复制槽的confirmed_flush_lsn = null会导致这个错误,因为它会阻止连接器找到这个插槽。
我如何解决这个问题,为什么confirmed_flush_lsn值将为空?
发布于 2020-01-08 19:28:21
我通过重新启动由连接器引用的aws中的rds DB实例来修复它,之后,confirmed_flush_lsn的值被重置为类似于(restart_lsn = 3/93043310)的非空值。卡夫卡-连接能够找到replication_slot "slot1“的预期。连接器也被打开了。这暂时解决了我的问题,但首先我还是想了解如何将confirmed_flush_lsn =null设置为逻辑replication_slot。
https://stackoverflow.com/questions/59634983
复制相似问题