我正在为debezium使用AWS模式注册表。
在debezium中,我提到服务器名为mysql-db01。因此,debezium将使用此服务器名称创建一个主题,以添加一些关于服务器和模式更改的元数据。
当我部署连接器时,在模式注册表中得到的模式如下所示。
{
"type": "record",
"name": "SchemaChangeKey",
"namespace": "io.debezium.connector.mysql",
"fields": [
{
"name": "databaseName",
"type": "string"
}
],
"connect.name": "io.debezium.connector.mysql.SchemaChangeKey"
}然后立即创建了另一个版本,如下所示。
{
"type": "record",
"name": "SchemaChangeValue",
"namespace": "io.debezium.connector.mysql",
"fields": [
{
"name": "source",
"type": {
"type": "record",
"name": "Source",
"fields": [
{
"name": "version",
"type": "string"
},
{
"name": "connector",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "ts_ms",
"type": "long"
},
{
"name": "snapshot",
"type": [
{
"type": "string",
"connect.version": 1,
"connect.parameters": {
"allowed": "true,last,false"
},
"connect.default": "false",
"connect.name": "io.debezium.data.Enum"
},
"null"
],
"default": "false"
},
{
"name": "db",
"type": "string"
},
{
"name": "sequence",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "table",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "server_id",
"type": "long"
},
{
"name": "gtid",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "file",
"type": "string"
},
{
"name": "pos",
"type": "long"
},
{
"name": "row",
"type": "int"
},
{
"name": "thread",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "query",
"type": [
"null",
"string"
],
"default": null
}
],
"connect.name": "io.debezium.connector.mysql.Source"
}
},
{
"name": "databaseName",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "schemaName",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "ddl",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "tableChanges",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "Change",
"namespace": "io.debezium.connector.schema",
"fields": [
{
"name": "type",
"type": "string"
},
{
"name": "id",
"type": "string"
},
{
"name": "table",
"type": {
"type": "record",
"name": "Table",
"fields": [
{
"name": "defaultCharsetName",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "primaryKeyColumnNames",
"type": [
"null",
{
"type": "array",
"items": "string"
}
],
"default": null
},
{
"name": "columns",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "Column",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "jdbcType",
"type": "int"
},
{
"name": "nativeType",
"type": [
"null",
"int"
],
"default": null
},
{
"name": "typeName",
"type": "string"
},
{
"name": "typeExpression",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "charsetName",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "length",
"type": [
"null",
"int"
],
"default": null
},
{
"name": "scale",
"type": [
"null",
"int"
],
"default": null
},
{
"name": "position",
"type": "int"
},
{
"name": "optional",
"type": [
"null",
"boolean"
],
"default": null
},
{
"name": "autoIncremented",
"type": [
"null",
"boolean"
],
"default": null
},
{
"name": "generated",
"type": [
"null",
"boolean"
],
"default": null
}
],
"connect.name": "io.debezium.connector.schema.Column"
}
}
}
],
"connect.name": "io.debezium.connector.schema.Table"
}
}
],
"connect.name": "io.debezium.connector.schema.Change"
}
}
}
],
"connect.name": "io.debezium.connector.mysql.SchemaChangeValue"这两个架构不匹配,因此AWS架构注册表不允许连接器注册第二个版本。但是第二个版本是连接器的实际模式。
为了解决这个问题,我删除了模式(在模式注册表中)。然后删除连接器,重新部署连接器,然后它就可以工作了。
但是我试图理解为什么模式第一次会有不同的版本。
发布于 2021-08-07 03:37:18
我在源连接器和宿连接器上使用了以下键/值转换器来使其工作。
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"internal.key.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter",
"internal.key.converter.schemas.enable": "false",
"internal.value.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter",
"internal.value.converter.schemas.enable": "false",
"value.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter",
"value.converter.schemas.enable": "true",
"value.converter.region": "ap-south-1",
"key.converter.schemaAutoRegistrationEnabled": "true",
"value.converter.schemaAutoRegistrationEnabled": "true",
"key.converter.avroRecordType": "GENERIC_RECORD",
"value.converter.avroRecordType": "GENERIC_RECORD",
"key.converter.registry.name": "bhuvi-debezium",
"value.converter.registry.name": "bhuvi-debezium",https://stackoverflow.com/questions/68621802
复制相似问题