我试图使用别名将一些匹配模式old_schema的数据转换为new_schema中使用的字段名。
我已经做了很长时间了,看不出这段代码有什么问题:
from fastavro import writer, reader, json_writer
from fastavro.schema import parse_schema
from io import BytesIO
# Sample data
input_json = [
{
"key1": "value1",
"key2": "value2",
"key3": "value3"
}
]
# Old schema that matches the input_json
old_schema = parse_schema({
"type": "record",
"namespace": "com.node40",
"name": "generated",
"fields": [
{
"name": "key1",
"type": "string"
},
{
"name": "key2",
"type": "string"
},
{
"name": "key3",
"type": "string"
}
]
})
# New schema with old schema names as aliases
new_schema = parse_schema({
"type": "record",
"namespace": "com.node40",
"name": "test",
"fields": [
{
"name": "k1",
"type": "string",
"aliases": ["key1"]
},
{
"name": "k2",
"type": "string",
"aliases": ["key2"]
},
{
"name": "k3",
"type": "string",
"aliases": ["key3"]
}
]
})
records = [
{
"key1": "value1",
"key2": "value2",
"key3": "value3"
}
]
# Write to buffer as serialized avro using old_schema
buffer = BytesIO()
writer(buffer, old_schema, input_json, validator=True)
buffer.seek(0)
# Read serialized avro from buffer, deserialize and write to json file
input_avro = reader(buffer, new_schema)
json_writer('fitted_data.json', new_schema, input_avro)这导致了来自fastavro的一个fastavro。这是一个如此简单的例子,但我只是看不出这有什么问题。帮助感激!
发布于 2022-10-06 13:03:28
主要问题是旧模式名为generated,名称空间为com.node40。新模式具有相同的命名空间,但名为test。avro分解规则声明这些记录与both schemas are records with the same (unqualified) name匹配。
因此,可以重命名新模式以与旧模式匹配,或者再次使用别名,并在新模式上执行以下操作:
new_schema = {
"type": "record",
"namespace": "com.node40",
"name": "test",
"aliases": ["com.node40.generated"],
...
}注意:从技术上讲,您只需要编写"aliases": ["generated"],但是它看起来在fastavro中有一个错误,它没有正确地处理这种情况,但是使用完全名称空间的名称就可以了。
在完成所有这些操作之后,您的示例仍然会失败,因为在最后您有了json_writer('fitted_data.json', new_schema, input_avro),但是应该将其更改为:
with open('fitted_data.json', 'w') as fo:
json_writer(fo, new_schema, input_avro)https://stackoverflow.com/questions/73968193
复制相似问题