首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >卡夫卡-> DataStream -> MongoDB

卡夫卡-> DataStream -> MongoDB
EN

Stack Overflow用户
提问于 2016-02-02 16:26:33
回答 2查看 5.2K关注 0票数 11

我想设置Flink,这样它就可以将数据流从Apache转换并重定向到MongoDB。出于测试目的,我构建在flink流-Connectors.kafka示例(https://github.com/apache/flink)之上。

卡夫卡流是正确红色的链接,我可以映射他们等,但问题发生时,我想保存每一个接收和转换的信息到MongoDB。我发现的关于MongoDB集成的唯一例子是来自github的flink-mongodb测试。不幸的是,它使用的是静态数据源(数据库),而不是数据流。

我认为应该有一些DataStream.addSink实现的MongoDB,但显然没有。

实现这一目标的最佳途径是什么?我是否需要编写自定义接收器函数,还是可能遗漏了什么?也许应该用不同的方式来做?

我没有任何解决办法,所以任何建议都将不胜感激。

下面是一个例子,我作为输入得到了什么,作为输出我需要存储什么。

代码语言:javascript
复制
Apache Kafka Broker <-------------- "AAABBBCCCDDD" (String)
Apache Kafka Broker --------------> Flink: DataStream<String>

Flink: DataStream.map({
    return ("AAABBBCCCDDD").convertTo("A: AAA; B: BBB; C: CCC; D: DDD")
})
.rebalance()
.addSink(MongoDBSinkFunction); // store the row in MongoDB collection

正如您在本例中所看到的,我使用Flink主要用于Kafka的消息流缓冲和一些基本的解析。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-02-02 17:00:35

目前在Flink中没有流MongoDB接收器可用。

但是,有两种方法可以将数据写入MongoDB:

  • 使用Flink的DataStream.write()调用。它允许您与流一起使用任何OutputFormat (来自批处理API)。使用Flink的HadoopOutputFormatWrapper,您可以使用正式的MongoDB Hadoop连接器
  • 你自己去执行水槽。使用流API实现接收器非常容易,我相信MongoDB有一个很好的Java库。

这两种方法都没有提供任何复杂的处理保证。但是,当您使用Flink和Kafka (并启用了检查点)时,您将至少有一次语义:在错误情况下,数据将再次流到MongoDB接收器。如果您正在进行幂等更新,则重新执行这些更新不应导致任何不一致。

如果您确实需要MongoDB的精确一次语义,那么您可能应该提交一个在Flink的JIRA并与社区讨论如何实现它。

票数 3
EN

Stack Overflow用户

发布于 2018-01-19 11:01:00

作为Robert答案的另一种选择,您可以再次将结果写入Kafka,然后使用维护好的kafka连接器之一删除MongoDB数据库中主题的内容。

卡夫卡,->,Flink,->,Kafka,->,Mongo/任何东西

使用这种方法,您可以确定“至少一次语义”的行为。

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/35158683

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档