首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何设置TOPOLOGY_MAX_SPOUT_PENDING参数

如何设置TOPOLOGY_MAX_SPOUT_PENDING参数
EN

Stack Overflow用户
提问于 2015-09-28 04:21:01
回答 2查看 2.4K关注 0票数 3

在我的拓扑中,我从Kafka队列读取触发器消息。在接收到触发消息时,我需要向闪电发送大约4096条消息。在螺栓中,经过一些处理后,它将发布到另一个Kafka队列(另一个拓扑稍后将使用它)。

我正在尝试设置TOPOLOGY_MAX_SPOUT_PENDING参数来限制要发送的消息的数量。但我看到它没有任何效果。是因为我在一个nextTuple()方法中发出了所有的元组吗?如果是这样,应该做些什么呢?

EN

回答 2

Stack Overflow用户

发布于 2015-09-28 15:56:49

如果你正在阅读卡夫卡,你应该使用KafkaSpout,因为它充满了风暴。不要试图实现您自己的spout,相信我,我在生产中使用KafkaSpout,它工作得非常顺利。每个Kafka消息恰好生成一个元组。

正如您在this nice page from the manual上看到的,您可以像这样设置topology.max.spout.pending

代码语言:javascript
复制
Config conf = new Config();
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology("mytopology", conf, topology);

topology.max.spout.pending是为每个spout设置的,如果您有四个spout,那么拓扑中的非完整元组的最大数量将等于spout* topology.max.spout.pending的数量。

另一个技巧是,您应该使用storm UI来查看是否正确设置了topology.max.spout.pending

请记住,topology.max.spout.pending只是拓扑中未处理的元组的数量,拓扑永远不会停止消费来自kafka的消息,至少在生产系统上是这样……如果你想使用4096个批次,你需要在你的螺栓上实现缓存逻辑,或者使用其他东西而不是storm (面向微批次的东西)。

票数 2
EN

Stack Overflow用户

发布于 2015-09-28 04:40:23

要创建TOPOLOGY_MAX_SPOUT_PENDING,您需要启用容错机制(即,在Spouts中分配消息in,在Bolts中分配锚点和ack )。此外,如果每次调用Spout.nextTuple()时发出一个以上的元组,TOPOLOGY_MAX_SPOUT_PENDING将无法按预期工作。

由于更多的原因,这实际上是一种糟糕的做法,因此每个Spout.nextTuple()调用都会发出多个元组(有关更多详细信息,请参阅Why should I not loop or block in Spout.nextTuple() )。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/32812448

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档