我想设置Flink,这样它就可以将数据流从Apache转换并重定向到MongoDB。出于测试目的,我构建在flink流-Connectors.kafka示例(https://github.com/apache/flink)之上。
卡夫卡流是正确红色的链接,我可以映射他们等,但问题发生时,我想保存每一个接收和转换的信息到MongoDB。我发现的关于MongoDB集成的唯一例子是来自github的flink-mongodb测试。不幸的是,它使用的是静态数据源(数据库),而不是数据流。
我认为应该有一些DataStream.addSink实现的MongoDB,但显然没有。
实现这一目标的最佳途径是什么?我是否需要编写自定义接收器函数,还是可能遗漏了什么?也许应该用不同的方式来做?
我没有任何解决办法,所以任何建议都将不胜感激。
下面是一个例子,我作为输入得到了什么,作为输出我需要存储什么。
Apache Kafka Broker <-------------- "AAABBBCCCDDD" (String)
Apache Kafka Broker --------------> Flink: DataStream<String>
Flink: DataStream.map({
return ("AAABBBCCCDDD").convertTo("A: AAA; B: BBB; C: CCC; D: DDD")
})
.rebalance()
.addSink(MongoDBSinkFunction); // store the row in MongoDB collection正如您在本例中所看到的,我使用Flink主要用于Kafka的消息流缓冲和一些基本的解析。
发布于 2016-02-02 17:00:35
目前在Flink中没有流MongoDB接收器可用。
但是,有两种方法可以将数据写入MongoDB:
DataStream.write()调用。它允许您与流一起使用任何OutputFormat (来自批处理API)。使用Flink的HadoopOutputFormatWrapper,您可以使用正式的MongoDB Hadoop连接器这两种方法都没有提供任何复杂的处理保证。但是,当您使用Flink和Kafka (并启用了检查点)时,您将至少有一次语义:在错误情况下,数据将再次流到MongoDB接收器。如果您正在进行幂等更新,则重新执行这些更新不应导致任何不一致。
如果您确实需要MongoDB的精确一次语义,那么您可能应该提交一个在Flink的JIRA并与社区讨论如何实现它。
发布于 2018-01-19 11:01:00
作为Robert答案的另一种选择,您可以再次将结果写入Kafka,然后使用维护好的kafka连接器之一删除MongoDB数据库中主题的内容。
卡夫卡,->,Flink,->,Kafka,->,Mongo/任何东西
使用这种方法,您可以确定“至少一次语义”的行为。
https://stackoverflow.com/questions/35158683
复制相似问题