有没有办法在三叉树中动态创建拓扑?谁能举个例子?
发布于 2016-06-29 05:49:48
首先,您可能还知道创建拓扑不是三叉树的一部分。Trident只是一个用于微批处理的API。
根据定义,创建新拓扑是动态的。这就是TopologyBuilder类正在做的事情。
所以回答你的问题,是的,可以从三叉树创建新的拓扑,或者从简单的风暴喷口和螺栓创建新的拓扑。您唯一需要的是您的拓扑创建逻辑应该有权访问Storm集群(类和其他资源),如果您在Storm中运行您的逻辑,根据定义,这也是满足的。
您需要做的最后一件事是找到一种方法来提交新创建的拓扑,这就是创建StormSubmitter类的目的,当您在三叉树或普通插口/螺栓中运行逻辑时,这个类的定义再次满足于您的类路径。
出于好奇心,你为什么打算这么做?你的要求是什么?
示例:
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IBatchSpout;
import org.apache.storm.tuple.Fields;
public class DynamicTopologySpout implements IBatchSpout {
private static final long serialVersionUID = -3269435263455830842L;
@Override
@SuppressWarnings("rawtypes")
public void open(Map conf, TopologyContext context) {}
@Override
public void emitBatch(long batchId, TridentCollector collector) {
if (newTopologyNeeded()) {
TopologyBuilder builder = new TopologyBuilder();
builder
.setSpout("spout", new BaseRichSpout() {
private static final long serialVersionUID = 1L;
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) {}
@Override @SuppressWarnings("rawtypes") public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {}
@Override public void nextTuple() {}
}, 1)
.setMaxSpoutPending(15)
.setNumTasks(1);
StormTopology topology = builder.createTopology();
Config config = new Config();
try {
StormSubmitter.submitTopology("dynamic-topology", config, topology);
} catch (Exception e) {
e.printStackTrace();
collector.reportError(e);
}
}
}
private boolean newTopologyNeeded() {
// Check if topology needed ...
return false;
}
@Override
public void ack(long batchId) {}
@Override
public void close() {}
@Override
public Map<String, Object> getComponentConfiguration() { return null; }
@Override
public Fields getOutputFields() { return null; }
}https://stackoverflow.com/questions/37926154
复制相似问题