我正在创建由FlowFiles处理器发出的下游操作和拆分的ExecuteSql。我已经用数据填充了FlowFiles的属性,我想将这些数据放入每个FlowFile内容中包含的Avro元数据中。
我该怎么做?
我尝试使用配置了UpdateRecord处理器的AvroReader和AvroRecordSetWriter,以及一个带有/canary键的属性,该属性应该在Avro文档中的某个地方为该键写入FlowFile属性。不过,它在输出的任何地方都不会出现。
将Avro数据中的记录移动到子键并将元数据部分作为记录数据的一部分是可以接受的。不过,我不喜欢这样做,因为它似乎不是正确的解决方案,而且听起来比简单地修改Avro元数据要复杂得多。
发布于 2018-01-29 23:31:57
记录感知处理器(以及读取器/撰稿人)不了解元数据,这意味着他们目前(截至NiFi 1.5.0)不能以任何方式(检查、创建、删除等)对元数据进行操作,因此UpdateRecord对元数据本身不起作用。使用您的/canary属性键,它将尝试在顶级的Avro记录中插入一个字段,名为金丝雀,并且应该具有指定的值。不过,我认为您的输出模式需要在顶层添加金丝雀字段,否则可能会忽略它(我对此不肯定,您可以检查输出模式以查看是否自动添加)。
目前没有NiFi处理器可以显式更新Avro元数据(例如,MergeContent在合并各种Avro文件方面做了一些工作,但是您不能选择设置一个值)。但是,我有一个未修饰的Groovy脚本,您可以在ExecuteScript中使用它向NiFi 1.5.0+中的Avro文件添加元数据。在ExecuteScript中,您可以将语言设置为Groovy,然后将以下内容设置为脚本主体,然后将用户定义的(又称“动态”属性)添加到ExecuteScript中,其中键将是元数据键,计算值(属性支持表达式语言)将是值:
@Grab('org.apache.avro:avro:1.8.1')
import org.apache.avro.*
import org.apache.avro.file.*
import org.apache.avro.generic.*
def flowFile = session.get()
if(!flowFile) return
try {
// Save off dynamic property values for metadata key/values later
def metadata = [:]
context.properties.findAll {e -> e.key.dynamic}.each {k,v -> metadata.put(k.name, context.getProperty(k).evaluateAttributeExpressions(flowFile).value.bytes)}
flowFile = session.write(flowFile, {inStream, outStream ->
DataFileStream<GenericRecord> reader = new DataFileStream<>(inStream, new GenericDatumReader<GenericRecord>())
DataFileWriter<GenericRecord> writer = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>())
def schema = reader.schema
def inputCodec = reader.getMetaString(DataFileConstants.CODEC) ?: DataFileConstants.NULL_CODEC
// Forward the existing metadata to the output
reader.metaKeys.each { key ->
if (!DataFileWriter.isReservedMeta(key)) {
byte[] metadatum = reader.getMeta(key)
writer.setMeta(key, metadatum)
}
}
// For each dynamic property, set the key/value pair as Avro metadata
metadata.each {k,v -> writer.setMeta(k,v)}
writer.setCodec(CodecFactory.fromString(inputCodec))
writer.create(schema, outStream)
writer.appendAllFrom(reader, false)
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
log.error('Error adding Avro metadata, penalizing flow file and routing to failure', e)
flowFile = session.penalize(flowFile)
session.transfer(flowFile, REL_FAILURE)
} 请注意,此脚本可以在1.5.0之前使用NiFi版本,但在1.5.0之前不支持顶部的@Grab,因此您必须将Avro及其依赖项下载到一个平面文件夹中,并指向ExecuteScript的Module属性中的@Grab。
https://stackoverflow.com/questions/48509642
复制相似问题