首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Apache Storm Bolt无法接收来自其他Bolt发出的任何元组

Apache Storm Bolt无法接收来自其他Bolt发出的任何元组
EN

Stack Overflow用户
提问于 2018-01-07 18:27:26
回答 1查看 443关注 0票数 0

我是暴风的新手。我想使用一个名为'tileClean‘的螺栓来发射单个Stream,同时使用另外五个螺栓来接收Stream。像这样:flow image

如您所见,“一、二、三、四、五”螺栓将同时接收相同的数据。但实际上,“一、二、三、四、五”螺栓无法接收任何数据。下面是我的代码:

代码语言:javascript
复制
@Override
public void execute(TupleWindow inputWindow) {
    logger.debug("clean");
    List<Tuple> tuples = inputWindow.get();
    //logger.debug("clean phrase. tuple size is : {}", tuples.size());
    for (Tuple input : tuples) {
        // some other code..

        //this._collector.emit(input, new Values(nal));
        this._collector.emit("stream_id_one", input, new Values(nal));
        this._collector.emit("stream_id_two", input, new Values(nal));
        this._collector.emit("stream_id_three", input, new Values(nal));
        this._collector.emit("stream_id_four", input, new Values(nal));
        this._collector.emit("stream_id_five", input, new Values(nal));

        this._collector.ack(input);
    }
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields(BoltConstant.EMIT_LOGOBJ));
    declarer.declareStream("stream_id_one", new Fields(BoltConstant.EMIT_LOGOBJ));
    declarer.declareStream("stream_id_two", new Fields(BoltConstant.EMIT_LOGOBJ));
    declarer.declareStream("stream_id_three", new Fields(BoltConstant.EMIT_LOGOBJ));
    declarer.declareStream("stream_id_four", new Fields(BoltConstant.EMIT_LOGOBJ));
    declarer.declareStream("stream_id_five", new Fields(BoltConstant.EMIT_LOGOBJ));
}

拓扑集是:

代码语言:javascript
复制
builder.setBolt("tileClean", cleanBolt, 1).shuffleGrouping("assembly");    
builder.setBolt("OneBolt", OneBolt, 1).shuffleGrouping("tileClean", "stream_id_one");
builder.setBolt("TwoBolt", TwoBolt, 1).shuffleGrouping("tileClean", "stream_id_two");
builder.setBolt("ThreeBolt", ThreeBolt, 1).shuffleGrouping("tileClean", "stream_id_three");
builder.setBolt("FourBolt", FourBolt, 1).shuffleGrouping("tileClean", "stream_id_four");
builder.setBolt("FiveBolt", FiveBolt, 1).shuffleGrouping("tileClean", "stream_id_five");

tileClean可以接收从assymble发出的元组,但其他螺栓不能接收。

我的代码有什么地方不正确吗?

EN

回答 1

Stack Overflow用户

发布于 2018-01-09 16:23:31

由于您省略了"for loop“语句和第一个collector.emit语句之间的代码,因此消息不能通过的可能性之一是在省略的代码之间进行适当的错误处理。您可以确保将try-catch块或调试放在"collector.emit“语句之前进行日志记录,以检查您的代码是否确实到达那里。

还可以在storm-ui上检查上面的内容,其中将显示在喷嘴/螺栓之间传输消息的拓扑度量。它还报告在两次任务执行之间可能发生的任何错误消息。

另一种可能性是在您使用多节点集群的情况下,如果您的任务分散在节点上(即,如果您在拓扑配置中分配了多个worker ),请确保机器可以在storm.yaml文件中配置的指定端口上通过网络相互通信。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48136248

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档