首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Confluent-Kafka: Python使用者中的Avro序列化与模式处理的混淆

Confluent-Kafka: Python使用者中的Avro序列化与模式处理的混淆
EN

Stack Overflow用户
提问于 2018-10-24 03:15:16
回答 1查看 1.6K关注 0票数 1

我试图理解Avro系列化的合流卡夫卡和模式注册表的用法。一直到最后,一切都进行得很顺利,但阿夫罗的最终期望给我带来了很多困惑。根据我的阅读和理解,Avro序列化为我们提供了灵活性,当我们对模式进行更改时,我们可以简单地管理它,而不会影响老的生产者/消费者。

随后,我开发了一个python生成器,它将在中检查Schema是否存在,如果没有,创建它并开始生成下面显示的json消息。当我需要更改模式时,我只需在我的生产者中更新它,这将生成带有新模式的消息。

我的旧模式:

代码语言:javascript
复制
data = '{"schema":"{\\"type\\":\\"record\\",\\"name\\":\\"value\\",\\"namespace\\":\\"my.test\\",\\"fields\\":[{\\"name\\":\\"fname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"lname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"email\\",\\"type\\":\\"string\\"},{\\"name\\":\\"principal\\",\\"type\\":\\"string\\"},{\\"name\\":\\"ipaddress\\",\\"type\\":\\"string\\"},{\\"name\\":\\"mobile\\",\\"type\\":\\"long\\"},{\\"name\\":\\"passport_make_date\\",\\"type\\":[\\"string\\",\\"null\\"],\\"logicalType\\":\\"timestamp\\",\\"default\\":\\"None\\"},{\\"name\\":\\"passport_expiry_date\\",\\"type\\":\\"string\\",\\"logicalType\\":\\"date\\"}]}"}'

生产者提供的样本数据-1:

代码语言:javascript
复制
{u'mobile': 9819841242, u'lname': u'Rogers', u'passport_expiry_date': u'2026-05-21', u'passport_make_date': u'2016-05-21', u'fname': u'tom', u'ipaddress': u'208.103.236.60', u'email': u'tom_Rogers@TEST.co.nz', u'principal': u'tom@EXAMPLE.COM'}

我的新模式:

代码语言:javascript
复制
data = '{"schema":"{\\"type\\":\\"record\\",\\"name\\":\\"value\\",\\"namespace\\":\\"my.test\\",\\"fields\\":[{\\"name\\":\\"fname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"lname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"email\\",\\"type\\":\\"string\\"},{\\"name\\":\\"principal\\",\\"type\\":\\"string\\"},{\\"name\\":\\"ipaddress\\",\\"type\\":\\"string\\"},{\\"name\\":\\"mobile\\",\\"type\\":\\"long\\"},{\\"name\\":\\"new_passport_make_date\\",\\"type\\":[\\"string\\",\\"null\\"],\\"logicalType\\":\\"timestamp\\",\\"default\\":\\"None\\"},{\\"name\\":\\"new_passport_expiry_date\\",\\"type\\":\\"string\\",\\"logicalType\\":\\"date\\"}]}"}'

生产者提供的样本数据-2:

代码语言:javascript
复制
{u'mobile': 9800647004, u'new_passport_make_date': u'2011-05-22', u'lname': u'Reed', u'fname': u'Paul', u'new_passport_expiry_date': u'2021-05-22', u'ipaddress': u'134.124.7.28', u'email': u'Paul_Reed@nbc.com', u'principal': u'Paul@EXAMPLE.COM'}

案例1:当有两个以上两个模式的生产者一起运行时,我可以使用下面的代码成功地使用消息。在这里之前一切都很好。

代码语言:javascript
复制
while True:
    try:
        msg = c.poll(10)

    except SerializerError as e:
        xxxxx 
        break
    print msg.value()

案例2:当我深入研究JSON字段时,就会出现混乱和中断。

首先,假设我有一个生产者运行上面的“messages”,并且有一个使用者成功地使用了这些消息。

代码语言:javascript
复制
print msg.value()["fname"] , msg.value()["lname"] , msg.value()["passport_make_date"], msg.value()["passport_expiry_date"]

当我使用上面提到的‘我的新模式’运行第二个生产者时,我的老消费者会中断,因为没有字段passport_expiry_date和passport_make_date,这是真的。

问题:

有时我认为,这是预期的,因为是我(开发人员)使用的字段名,而不是在消息。但阿夫罗在这里能帮上什么忙?丢失的字段不应该由Avro来处理吗?我在JAVA中看到了处理这种情况的例子,但是在Python中没有找到任何例子。例如,在github下面有处理此场景的完美示例。当字段不存在时,使用者只需打印“None”。

https://github.com/LearningJournal/ApacheKafkaTutorials

案例3:当我运行“旧生产者”和“旧消费者”这样的组合,然后在另一个终端“新生产者”和“新消费者”中,生产者/消费者会混在一起,事情就会中断,说不出json字段。

旧消费者==>

代码语言:javascript
复制
print msg.value()["fname"] , msg.value()["lname"] , msg.value()["passport_make_date"], msg.value()["passport_expiry_date"]

新型消费==>

代码语言:javascript
复制
print msg.value()["fname"] , msg.value()["lname"] , msg.value()["new_passport_make_date"], msg.value()["new_passport_expiry_date"]

问题:

我想,这也是意料之中的事。但是,Avro让我认为正确的消费者应该用正确的模式获得正确的消息。如果我使用msg.value(),并且总是在消费者端使用没有任何Avro角色的编程来解析字段,那么使用avro的好处是什么?在SR中发送带有消息/存储的架构有什么好处?

最后,是否有任何方法检查附加到消息的架构?我了解到,在Avro中,模式ID与消息附加在一起,该消息在读取和写入消息时与schema一起使用。但我从来没有在留言中看到过。

提前谢谢。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-10-26 06:42:30

还不清楚您在注册表上使用的兼容性设置是什么,但我将向后假设,这意味着您需要添加一个默认字段。

听起来你得到了一个Python,因为这些密钥不存在。

而不是msg.value()["non-existing-key"],您可以尝试

选项1:像对待dict()一样对待它

代码语言:javascript
复制
msg.value().get("non-existing-key", "Default value")

选项2:单独检查可能不存在的所有密钥

代码语言:javascript
复制
some_var = None  # What you want to parse
val = msg.value()
if "non-existing-key" not in val:
    some_var = "Default Value"

否则,您必须在旧数据上“投影”较新的模式,这就是Java代码通过使用SpecificRecord子类所做的事情。这样,旧数据将被解析为较新的架构,后者具有新的字段及其默认值。

如果您在Java中使用GenericRecord,则会出现类似的问题。我不确定Python中是否有相当于SpecificRecord的内容。

顺便说一句,我不认为字符串"None"可以应用于logicalType=timestamp

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52960587

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档