我能够使用标准的spout,bolt组合来进行流聚合,并且在使用tick tuple来持久化数据以利用批处理的情况下工作得非常好。现在我自己在做一些失败管理(跟踪未保存的元组等)(也就是说不是来自storm的ootb )。
但我读到过,三叉树为您提供了更高的抽象性和更好的故障管理。我不明白的是,在三叉树中是否有tick tuple支持。基本上,我希望在内存中批处理当前一分钟左右的数据,并使用三叉树持久化前几分钟的任何聚合数据。
这里的任何指示或设计建议都会很有帮助。
谢谢
发布于 2015-12-16 06:32:23
实际上,微批处理是三叉树的一个内置特性。为此,您不需要任何tick tuple。当您的代码中包含类似以下内容时:
topology
.newStream("myStream", spout)
.partitionPersist(
ElasticSearchEventState.getFactoryFor(connectionProvider),
new Fields("field1", "field2"),
new ElasticSearchEventUpdater()
)(我在这里使用我的自定义ElasticSearch状态/更新器,您可以使用其他东西)
因此,当你有这样的东西时,在三叉树背后,将你的流分组到批中,并对这些批执行partitionPersist操作,而不是对单个元组执行。
如果您出于任何原因仍然需要tick tuple,只需创建您的tick spout,下面这样的代码对我很有效:
public class TickSpout implements IBatchSpout {
public static final String TIMESTAMP_FIELD = "timestamp";
private final long delay;
public TickSpout(long delay) {
this.delay = delay;
}
@Override
public void open(Map conf, TopologyContext context) {
}
@Override
public void emitBatch(long batchId, TridentCollector collector) {
Utils.sleep(delay);
collector.emit(new Values(System.currentTimeMillis()));
}
@Override
public void ack(long batchId) {
}
@Override
public void close() {
}
@Override
public Map getComponentConfiguration() {
return null;
}
@Override
public Fields getOutputFields() {
return new Fields(TIMESTAMP_FIELD);
}
}https://stackoverflow.com/questions/25340676
复制相似问题