首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >flink 1.10 DataStream API中的JSON数据聚合

flink 1.10 DataStream API中的JSON数据聚合
EN

Stack Overflow用户
提问于 2020-05-22 06:17:56
回答 1查看 298关注 0票数 0

我试图使用Kafka消息(作为Flink 1.10API StreamSource)在Elasticsearch中聚合数据。数据是以JSON格式接收的,是动态的,下面给出了样本,我想用唯一的ID组合单个文档中的多个记录,数据是按顺序排列的,它是时间序列数据。

源汇kafka和目的地汇弹性7.6.1 6

我没有发现任何好的例子,可以在下面的问题说明中使用。

代码语言:javascript
复制
Record : 1
{
"ID" : "1",
"timestamp" : "2020-05-07 14:34:51.325",
"Data" : 
{
 "Field1" : "ABC",
 "Field2" : "DEF"
}
}

Record : 2
{
"ID" : "1",
"timestamp" : "2020-05-07 14:34:51.725",
"Data" : 
{
 "Field3" : "GHY"
}
}

Result :

{
"ID" : "1",
"Start_timestamp" : "2020-05-07 14:34:51.325",
"End_timestamp" : "2020-05-07 14:34:51.725",
"Data" :
{
 "Field1" : "ABC",
 "Field2" : "DEF",
 "Field3" : "GHY"
}
}

以下是版本详细信息:

  1. Flink 1.10
  2. Flink-kafka-连接器2.11
  3. 弹性搜索连接器7.x
  4. Kafka 2.11
  5. JDK 1.8
EN

回答 1

Stack Overflow用户

发布于 2020-05-22 08:47:05

您所要求的可以被描述为某种形式的连接,并且有很多方法可以使用Flink来完成这一任务。在有状态充实示例中有一个Apache Flink训练,它展示了如何使用一个RichFlatMapFunction来实现类似的连接,这将帮助您开始工作。您将首先阅读相关的培训材料--至少是关于数据管道与ETL的部分。

最后,您将通过ID (通过keyBy)对流进行分区,然后使用键分区状态(在本例中可能是MapState,假设每个ID有几个属性/值对)来存储记录1中的信息,直到您准备好发出结果为止。

顺便说一句,如果这组键是无界的,那么您需要注意不要永远保持这种状态。或者在不再需要状态时清除状态(如本例所示),或者使用状态TTL来安排其最终删除。

有关Flink中其他类型联接的更多信息,请参见这个答案中的链接。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61948930

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档