我正在使用Apache的拓扑结构:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("socketspout", new SocketSpout(IP_HOST,PORT));
builder.setBolt("filterone", new FilterOne()).shuffleGrouping("socketspout");
builder.setBolt("filtertwo", new FilterTwo()).shuffleGrouping("filterone");第一个螺栓的方法是(FilteOne),这个类扩展了BaseRichBolt:
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("ID1","signal1"));
}
public void execute(Tuple input) {
int sig;
try {
sig=input.getInteger(1)*2;
System.out.println("Filter one.."+Integer.toString(sig));
collector.emit("ack1", new Values(input.getString(0), sig));
collector.ack(input);
} catch (Exception e) {
collector.fail(input);
}
}第二个螺栓的方法是(FilteTwo),这个类也扩展了BaseRichBolt:
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
public void execute(Tuple input) {
int sig;
try {
sig=input.getInteger(1)+1;
System.out.println("Filter two.."+Integer.toString(sig));
collector.ack(input);
} catch (Exception e) {
collector.fail(input);
}
}当执行程序模式本地集群时,我可以看到第一个螺栓发出元组,但第二个线程永远不会接收元组.

发布于 2017-07-25 22:39:05
解决了将过滤器一段代码从collector.emit("ack1", new Values(input.getString(0), sig));修改为collector.emit( new Values(input.getString(0), sig));的问题。
发布于 2017-12-28 16:43:10
该方法的收集器可以设置如下:
collector.emit(input, new Values(input.getString(0), sig));不要忘记在方法declareOutputFields中为这个值设置字段名:
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("myValue"));
}然后,在第二个螺栓中,尝试使用"myValue“字段获得值:
sig = input.getValueByField("myValue").getInteger(1)+1;https://stackoverflow.com/questions/45313961
复制相似问题