我试图理解Avro系列化的合流卡夫卡和模式注册表的用法。一直到最后,一切都进行得很顺利,但阿夫罗的最终期望给我带来了很多困惑。根据我的阅读和理解,Avro序列化为我们提供了灵活性,当我们对模式进行更改时,我们可以简单地管理它,而不会影响老的生产者/消费者。
随后,我开发了一个python生成器,它将在中检查Schema是否存在,如果没有,创建它并开始生成下面显示的json消息。当我需要更改模式时,我只需在我的生产者中更新它,这将生成带有新模式的消息。
我的旧模式:
data = '{"schema":"{\\"type\\":\\"record\\",\\"name\\":\\"value\\",\\"namespace\\":\\"my.test\\",\\"fields\\":[{\\"name\\":\\"fname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"lname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"email\\",\\"type\\":\\"string\\"},{\\"name\\":\\"principal\\",\\"type\\":\\"string\\"},{\\"name\\":\\"ipaddress\\",\\"type\\":\\"string\\"},{\\"name\\":\\"mobile\\",\\"type\\":\\"long\\"},{\\"name\\":\\"passport_make_date\\",\\"type\\":[\\"string\\",\\"null\\"],\\"logicalType\\":\\"timestamp\\",\\"default\\":\\"None\\"},{\\"name\\":\\"passport_expiry_date\\",\\"type\\":\\"string\\",\\"logicalType\\":\\"date\\"}]}"}'生产者提供的样本数据-1:
{u'mobile': 9819841242, u'lname': u'Rogers', u'passport_expiry_date': u'2026-05-21', u'passport_make_date': u'2016-05-21', u'fname': u'tom', u'ipaddress': u'208.103.236.60', u'email': u'tom_Rogers@TEST.co.nz', u'principal': u'tom@EXAMPLE.COM'}我的新模式:
data = '{"schema":"{\\"type\\":\\"record\\",\\"name\\":\\"value\\",\\"namespace\\":\\"my.test\\",\\"fields\\":[{\\"name\\":\\"fname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"lname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"email\\",\\"type\\":\\"string\\"},{\\"name\\":\\"principal\\",\\"type\\":\\"string\\"},{\\"name\\":\\"ipaddress\\",\\"type\\":\\"string\\"},{\\"name\\":\\"mobile\\",\\"type\\":\\"long\\"},{\\"name\\":\\"new_passport_make_date\\",\\"type\\":[\\"string\\",\\"null\\"],\\"logicalType\\":\\"timestamp\\",\\"default\\":\\"None\\"},{\\"name\\":\\"new_passport_expiry_date\\",\\"type\\":\\"string\\",\\"logicalType\\":\\"date\\"}]}"}'生产者提供的样本数据-2:
{u'mobile': 9800647004, u'new_passport_make_date': u'2011-05-22', u'lname': u'Reed', u'fname': u'Paul', u'new_passport_expiry_date': u'2021-05-22', u'ipaddress': u'134.124.7.28', u'email': u'Paul_Reed@nbc.com', u'principal': u'Paul@EXAMPLE.COM'}案例1:当有两个以上两个模式的生产者一起运行时,我可以使用下面的代码成功地使用消息。在这里之前一切都很好。
while True:
try:
msg = c.poll(10)
except SerializerError as e:
xxxxx
break
print msg.value()案例2:当我深入研究JSON字段时,就会出现混乱和中断。
首先,假设我有一个生产者运行上面的“messages”,并且有一个使用者成功地使用了这些消息。
print msg.value()["fname"] , msg.value()["lname"] , msg.value()["passport_make_date"], msg.value()["passport_expiry_date"]当我使用上面提到的‘我的新模式’运行第二个生产者时,我的老消费者会中断,因为没有字段passport_expiry_date和passport_make_date,这是真的。
问题:
有时我认为,这是预期的,因为是我(开发人员)使用的字段名,而不是在消息。但阿夫罗在这里能帮上什么忙?丢失的字段不应该由Avro来处理吗?我在JAVA中看到了处理这种情况的例子,但是在Python中没有找到任何例子。例如,在github下面有处理此场景的完美示例。当字段不存在时,使用者只需打印“None”。
https://github.com/LearningJournal/ApacheKafkaTutorials
案例3:当我运行“旧生产者”和“旧消费者”这样的组合,然后在另一个终端“新生产者”和“新消费者”中,生产者/消费者会混在一起,事情就会中断,说不出json字段。
旧消费者==>
print msg.value()["fname"] , msg.value()["lname"] , msg.value()["passport_make_date"], msg.value()["passport_expiry_date"]新型消费==>
print msg.value()["fname"] , msg.value()["lname"] , msg.value()["new_passport_make_date"], msg.value()["new_passport_expiry_date"]问题:
我想,这也是意料之中的事。但是,Avro让我认为正确的消费者应该用正确的模式获得正确的消息。如果我使用msg.value(),并且总是在消费者端使用没有任何Avro角色的编程来解析字段,那么使用avro的好处是什么?在SR中发送带有消息/存储的架构有什么好处?
最后,是否有任何方法检查附加到消息的架构?我了解到,在Avro中,模式ID与消息附加在一起,该消息在读取和写入消息时与schema一起使用。但我从来没有在留言中看到过。
提前谢谢。
发布于 2018-10-26 06:42:30
还不清楚您在注册表上使用的兼容性设置是什么,但我将向后假设,这意味着您需要添加一个默认字段。
听起来你得到了一个Python,因为这些密钥不存在。
而不是msg.value()["non-existing-key"],您可以尝试
选项1:像对待dict()一样对待它
msg.value().get("non-existing-key", "Default value")选项2:单独检查可能不存在的所有密钥
some_var = None # What you want to parse
val = msg.value()
if "non-existing-key" not in val:
some_var = "Default Value"否则,您必须在旧数据上“投影”较新的模式,这就是Java代码通过使用SpecificRecord子类所做的事情。这样,旧数据将被解析为较新的架构,后者具有新的字段及其默认值。
如果您在Java中使用GenericRecord,则会出现类似的问题。我不确定Python中是否有相当于SpecificRecord的内容。
顺便说一句,我不认为字符串"None"可以应用于logicalType=timestamp
https://stackoverflow.com/questions/52960587
复制相似问题