首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在Kafka中处理动态消息值?

如何在Kafka中处理动态消息值?
EN

Stack Overflow用户
提问于 2019-10-14 20:23:54
回答 1查看 228关注 0票数 1

我很难实现Kafka的动态消息值的功能。我使用的是confluent-kafka-pythonAvroProducerschema registry。生产者将以如下格式发送消息:

代码语言:javascript
复制
{'id':1, 'name':'A', 'properties': {'key1': 'value1', 'key2': 'value2', 'key2': 'value3'}},
{'id': 2, 'name': 'X', 'properties': {'key1': 'value1'}} 

消息之间的properties可能会有所不同。因此,有些可能有更多的键、值对,而有些可能有更少的键、值对。我正在尝试使用Kafka connect将这条消息从kafka发送到postgresql。我希望propertiespostgresql数据库中是json类型。

如何做到这一点呢?任何指点都会非常感谢。谢谢。

EN

回答 1

Stack Overflow用户

发布于 2020-09-18 01:26:37

对于您的示例,声明数组的单个模式就足够了。例如,在AVRO中,模式定义将如下所示:

代码语言:javascript
复制
{
    "name": "MyRecord",
    "type":"record",
    "fields":[
        {
            "name":"id",
            "type":"long"
        },
        {
            "name":"name",
            "type":"string"
        },
        {
            "name":"properties",
            "type": "array",  
             "items":{
                    "name":"mykvprop",
                    "type":"record",
                    "fields":[
                        {
                          "name":"key", 
                          "type":"string"
                        },
                        {
                          "name":"value", 
                          "type":"string"
                        }
                    ]
                }
        }
    ] 
}

如果您的消息因数据类型不同而不同,则需要使用AVRO联合、多模式主题或两者都使用的更复杂的解决方案。例如,如果属性数组的键和值不都是字符串。更多信息:

如果您使用的是Kafka Connect API而不是Kafka生产者API,这些高级情况会变得复杂,因为Connect API有一个内部架构,它似乎限制了您的选项(no union?)

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58376750

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档