首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在storm中将tick tuple与trident一起使用

在storm中将tick tuple与trident一起使用
EN

Stack Overflow用户
提问于 2014-08-16 21:35:33
回答 1查看 861关注 0票数 4

我能够使用标准的spout,bolt组合来进行流聚合,并且在使用tick tuple来持久化数据以利用批处理的情况下工作得非常好。现在我自己在做一些失败管理(跟踪未保存的元组等)(也就是说不是来自storm的ootb )。

但我读到过,三叉树为您提供了更高的抽象性和更好的故障管理。我不明白的是,在三叉树中是否有tick tuple支持。基本上,我希望在内存中批处理当前一分钟左右的数据,并使用三叉树持久化前几分钟的任何聚合数据。

这里的任何指示或设计建议都会很有帮助。

谢谢

EN

回答 1

Stack Overflow用户

发布于 2015-12-16 06:32:23

实际上,微批处理是三叉树的一个内置特性。为此,您不需要任何tick tuple。当您的代码中包含类似以下内容时:

代码语言:javascript
复制
topology
    .newStream("myStream", spout)
    .partitionPersist(
        ElasticSearchEventState.getFactoryFor(connectionProvider),
        new Fields("field1", "field2"),
        new ElasticSearchEventUpdater()
    )

(我在这里使用我的自定义ElasticSearch状态/更新器,您可以使用其他东西)

因此,当你有这样的东西时,在三叉树背后,将你的流分组到批中,并对这些批执行partitionPersist操作,而不是对单个元组执行。

如果您出于任何原因仍然需要tick tuple,只需创建您的tick spout,下面这样的代码对我很有效:

代码语言:javascript
复制
public class TickSpout implements IBatchSpout {

    public static final String TIMESTAMP_FIELD = "timestamp";
    private final long delay;

    public TickSpout(long delay) {
        this.delay = delay;
    }

    @Override
    public void open(Map conf, TopologyContext context) {
    }

    @Override
    public void emitBatch(long batchId, TridentCollector collector) {
        Utils.sleep(delay);
        collector.emit(new Values(System.currentTimeMillis()));
    }

    @Override
    public void ack(long batchId) {
    }

    @Override
    public void close() {
    }

    @Override
    public Map getComponentConfiguration() {
        return null;
    }

    @Override
    public Fields getOutputFields() {
        return new Fields(TIMESTAMP_FIELD);
    }
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/25340676

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档