我已经设置了环境与Postgresql数据库,使用debezium连接器与卡夫卡连接和卡夫卡。Kafka有多个实例(3)正在运行,并且配置了Zookeeper (3)整个流水线中的.The连接正在工作,但根据Debezium的文档,没有根据数据库中的表自动创建主题。例如,如果表A和表B在某个模式中,我假设这两个主题是在Kafka中隐式创建的。连接器和任务的状态是RUNNING,下面提到的是我对连接器所做的配置。
{
"name": "geo-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": <dbHostName>,
"database.port": <dbPort>,
"database.user": <dbUser>,
"database.password":<dbPassword> ,
"database.dbname" : <dbName>,
"database.server.name": <logicalName>,
"database.history.kafka.bootstrap.servers":<>,
"database.history.kafka.topic": "schema-changes.inventory",
"plugin.name":"wal2json",
"config.storage.replication.factor": "3",
"offset.storage.replication.factor" : "3",
"auto.create.topics.enable" : "true",
"snapshot.mode" : "always"
}
}我在连接日志中看到的错误是,
2018-08-09 15:28:50,409 - DEBUG [KafkaBasedLog Work Thread - kconnect-offsets:Fetcher@199] - [Consumer clientId=consumer-1, groupId=1] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(kconnect-offsets-10, kconnect-offsets-4, kconnect-offsets-16, kconnect-offsets-7, kconnect-offsets-19, kconnect-offsets-13, kconnect-offsets-22, kconnect-offsets-1)) to broker kafka-02.hotel02.pro06.eu.idealo.com:9092 (id: 2002 rack: pro06)
2018-08-09 15:28:50,465 - DEBUG [kafka-producer-network-thread | producer-6:NetworkClient$DefaultMetadataUpdater@927] - [Producer clientId=producer-6] Sending metadata request (type=MetadataRequest, topics=dbserver1.public.spatial_ref_sys) to node kafka-01.hotel02.pro05.eu.idealo.com:9092 (id: 2004 rack: pro05)
2018-08-09 15:28:50,467 - WARN [kafka-producer-network-thread | producer-6:NetworkClient$DefaultMetadataUpdater@882] - [Producer clientId=producer-6] Error while fetching metadata with correlation id 23856 : {dbserver1.public.spatial_ref_sys=UNKNOWN_TOPIC_OR_PARTITION}
2018-08-09 15:28:50,467 - DEBUG [kafka-producer-network-thread | producer-6:Metadata@270] - Updated cluster metadata version 23852 to Cluster(id = BwqlZApfT-ygzWr_wPcdng, nodes = [kafka-03.hotel02.pro05.eu.idealo.com:9092 (id: 2003 rack: pro05), kafka-01.hotel02.pro05.eu.idealo.com:9092 (id: 2004 rack: pro05), kafka-02.hotel02.pro06.eu.idealo.com:9092 (id: 2002 rack: pro06)], partitions = [])发布于 2018-08-13 15:08:41
您在日志中看到的消息是警告,而不是错误。您可以尝试使用kafka-topics.sh实用程序来列出可用的主题吗?
https://stackoverflow.com/questions/51770951
复制相似问题