我正在使用kinesis传递流来发送流,从事件桥到s3存储桶。但是我似乎找不到哪个类有配置动态分区的选项?
这是我的传递流代码:
new CfnDeliveryStream(this, `Export-delivery-stream`, {
s3DestinationConfiguration: {
bucketArn: bucket.bucketArn,
roleArn: kinesisFirehoseRole.roleArn,
prefix: `test/!{timestamp:yyyy/MM/dd}/`
}
});发布于 2021-10-06 17:32:34
我已经在这个问题上工作了几天,终于找到了一些有用的东西。下面是如何在CDK中实现它的示例。简而言之,必须像您所做的那样启用分区,但是您需要在所谓的processingConfiguration中设置键和.jq表达式。
我们的传入json数据如下所示:
{
"data":
{
"timestamp":1633521266990,
"defaultTopic":"Topic",
"data":
{
"OUT1":"Inactive",
"Current_mA":3.92
}
}
}CDK代码如下所示:
const DeliveryStream = new CfnDeliveryStream(this, 'deliverystream', {
deliveryStreamName: 'deliverystream',
extendedS3DestinationConfiguration: {
cloudWatchLoggingOptions: {
enabled: true,
},
bucketArn: Bucket.bucketArn,
roleArn: deliveryStreamRole.roleArn,
prefix: 'defaultTopic=!{partitionKeyFromQuery:defaultTopic}/!{timestamp:yyyy/MM/dd}/',
errorOutputPrefix: 'error/!{firehose:error-output-type}/',
bufferingHints: {
intervalInSeconds: 60,
},
dynamicPartitioningConfiguration: {
enabled: true,
},
processingConfiguration: {
enabled: true,
processors: [
{
type: 'MetadataExtraction',
parameters: [
{
parameterName: 'MetadataExtractionQuery',
parameterValue: '{Topic: .data.defaultTopic}',
},
{
parameterName: 'JsonParsingEngine',
parameterValue: 'JQ-1.6',
},
],
},
{
type: 'AppendDelimiterToRecord',
parameters: [
{
parameterName: 'Delimiter',
parameterValue: '\\n',
},
],
},
],
},
},
})https://stackoverflow.com/questions/69038997
复制相似问题