我很难实现Kafka的动态消息值的功能。我使用的是confluent-kafka-python的AvroProducer和schema registry。生产者将以如下格式发送消息:
{'id':1, 'name':'A', 'properties': {'key1': 'value1', 'key2': 'value2', 'key2': 'value3'}},
{'id': 2, 'name': 'X', 'properties': {'key1': 'value1'}} 消息之间的properties可能会有所不同。因此,有些可能有更多的键、值对,而有些可能有更少的键、值对。我正在尝试使用Kafka connect将这条消息从kafka发送到postgresql。我希望properties在postgresql数据库中是json类型。
如何做到这一点呢?任何指点都会非常感谢。谢谢。
发布于 2020-09-18 01:26:37
对于您的示例,声明数组的单个模式就足够了。例如,在AVRO中,模式定义将如下所示:
{
"name": "MyRecord",
"type":"record",
"fields":[
{
"name":"id",
"type":"long"
},
{
"name":"name",
"type":"string"
},
{
"name":"properties",
"type": "array",
"items":{
"name":"mykvprop",
"type":"record",
"fields":[
{
"name":"key",
"type":"string"
},
{
"name":"value",
"type":"string"
}
]
}
}
]
}如果您的消息因数据类型不同而不同,则需要使用AVRO联合、多模式主题或两者都使用的更复杂的解决方案。例如,如果属性数组的键和值不都是字符串。更多信息:
如果您使用的是Kafka Connect API而不是Kafka生产者API,这些高级情况会变得复杂,因为Connect API有一个内部架构,它似乎限制了您的选项(no union?)。
https://stackoverflow.com/questions/58376750
复制相似问题