我正在使用Benthos从Kafka读取AVRO编码的消息,它的kafka_key元数据字段设置为也包含AVRO编码的有效负载。这些AVRO编码的有效负载的模式存储在有一个用于解码它们的schema_registry_decode处理器中。我希望为每个包含两个字段的Kafka消息生成一个输出JSON消息,一个字段名为content,包含解码的AVRO消息,另一个字段名为metadata,包含由Benthos收集的各种元数据字段,包括解码的kafka_key有效载荷。
发布于 2022-02-12 00:12:49
事实证明,我们可以使用这样的branch处理器来实现这一点:
input:
kafka:
addresses:
- localhost:9092
consumer_group: benthos_consumer_group
topics:
- benthos_input
pipeline:
processors:
# Decode the message
- schema_registry_decode:
url: http://localhost:8081
# Populate output content field
- bloblang: |
root.content = this
# Decode kafka_key metadata payload and populate output metadata field
- branch:
request_map: |
root = meta("kafka_key")
processors:
- schema_registry_decode:
url: http://localhost:8081
result_map: |
root.metadata = meta()
root.metadata.kafka_key = this
output:
stdout: {}https://stackoverflow.com/questions/71087902
复制相似问题