有两个卡夫卡主题
新闻主题中的消息可以有以下图像Ids列表
{
"id": "news-1",
"title": "Title news-1",
"description": " description news-1",
"author": " Author news-1",
"imageIds": [
"images-1",
"images-2"
]
}图像主题中的消息如下所示
{
"id": "image-1",
"url": "https://www.mypublication.co.uk/image-title-1.jpeg",
"description": "title-1 description",
"height": 400,
"width": 450
}
{
"id": "image-2",
"url": "https://www.mypublication.co.uk/image-title-2.jpeg",
"description": "title-2 description",
"height": 400,
"width": 450
}我试图加入这两个流,以填充最后的新闻消息,丰富了所有的图像细节。
我尝试使用groupBy和聚合,如下所示
KTable<String, Image> images = builder.table(topics.getImagesTopic(), Consumed.with(Serdes.String(), imageSerde));
KStream<String, News> news = builder.stream(topics.getNewsTopic(), Consumed.with(Serdes.String(), newsSerde));
KTable<String, NewsImages> newsImagesKTable = news.flatMapValues(newsArticle -> newsArticle.getImageIds())
.map((newsId, imageId) -> new KeyValue<>(imageId, newsId)) // rekey not good !!?
.join(images, (newsId, image) -> {
return new ImageWrapper(newsId, image);
}, Joined.with(Serdes.String(), Serdes.String(), imageSerde))
.groupBy((imageId, imageWrapper) -> imageWrapper.getNewsId(), Grouped.with(Serdes.String(), imageWrapperSerde))
.aggregate(NewsImages::new, (newsId, image, newsImages) -> {
newsImages.setNewsId(newsId);
newsImages.addImage(image);
return newsImages;
}, Materialized.with(Serdes.String(),newsImagesSerde));
newsImagesKTable.toStream().
to(topics.getNewsImagesTopic());但是正如上面所预期的,代码聚合了新闻的所有图像。
当作者第一次用两幅图片发布新闻时,效果很好,我们可以看到下面的输出
"news-1" :
{
"newsId":"news-1",
"images":
{"image-1":{"id":"image-1","url":"https://www.mypublication.co.uk/image-1.jpeg","description":"title-1 description","height":400,"width":450},
"image-2":{"id":"image-2","url":"https://www.mypublication.co.uk/image-2.jpeg","description":"title-2 description","height":400,"width":450}}
}当作者仅用Imag-3重新发表这篇文章时,它正在输出所有三个图片(这就是聚合器是什么),新闻-1:Imag-1,Imag-2,Imag-3
我正在寻找其他任何其他方法来加入新闻&图像,并覆盖以前在重新发布新闻新闻-1:图像-3时的值。
发布于 2021-12-05 21:35:46
https://stackoverflow.com/questions/70056895
复制相似问题