我们有一个关于任务在单一拓扑中的并行性的问题。我们无法获得一个良好的,流畅的处理率。
我们正在使用Kafka和Storm来构建一个具有不同拓扑的系统,在这个系统中,数据按照使用Kafka主题连接的一系列拓扑进行处理。
我们使用的是Kafka 1.0.0和Storm 1.2.1。
邮件数量很少,每天大约有2000条消息,但是每项任务都需要相当长的时间。特别是一个拓扑处理每个任务所需的时间是可变的,通常在1到20分钟之间。如果按顺序处理,则吞吐量不足以处理所有传入消息。所有拓扑和卡夫卡系统都安装在一台机器上(16核,16 GB内存)。
由于消息是独立的,可以并行处理,因此我们试图使用Storm并发功能来提高吞吐量。
为此,拓扑配置如下:
使用此配置,我们在此拓扑中观察到以下行为。
我们有两个主要问题。首先,即使有4个工作人员和10个并行性提示,也只启动了4-5个任务.第二,在工作未完成时,不再启动批处理,即使它只是一个任务。
这不是没有足够的工作要做的问题,因为我们在开始时尝试了插入2000项任务,所以有大量的工作要做。
我们试图增加参数"maxSpoutsPending",期望拓扑同时读取更多批,并对它们进行排队,但它们似乎是内部流水线的,而不是并发处理的。
拓扑是使用以下代码创建的:
private static StormTopology buildTopologyOD() {
//This is the marker interface BrokerHosts.
BrokerHosts hosts = new ZkHosts(configuration.getProperty(ZKHOSTS));
TridentKafkaConfig tridentConfigCorrelation = new TridentKafkaConfig(hosts, configuration.getProperty(TOPIC_FROM_CORRELATOR_NAME));
tridentConfigCorrelation.scheme = new RawMultiScheme();
tridentConfigCorrelation.fetchSizeBytes = Integer.parseInt(configuration.getProperty(MAX_SIZE_BYTES_CORRELATED_STREAM));
OpaqueTridentKafkaSpout spoutCorrelator = new OpaqueTridentKafkaSpout(tridentConfigCorrelation);
TridentTopology topology = new TridentTopology();
Stream existingObject = topology.newStream("kafka_spout_od1", spoutCorrelator)
.shuffle()
.each(new Fields("bytes"), new ProcessTask(), new Fields(RESULT_FIELD, OBJECT_FIELD))
.parallelismHint(Integer.parseInt(configuration.getProperty(PARALLELISM_HINT)));
//Create a state Factory to produce outputs to kafka topics.
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withProducerProperties(kafkaProperties)
.withKafkaTopicSelector(new ODTopicSelector())
.withTridentTupleToKafkaMapper(new ODTupleToKafkaMapper());
existingObject.partitionPersist(stateFactory, new Fields(RESULT_FIELD, OBJECT_FIELD), new TridentKafkaUpdater(), new Fields(OBJECT_FIELD));
return topology.build();
}和配置创建为:
private static Config createConfig(boolean local) {
Config conf = new Config();
conf.setMaxSpoutPending(1); // Also tried 2..6
conf.setNumWorkers(4);
return conf;
}我们是否可以通过增加并行任务的数量或在完成处理批处理时避免饥饿来提高性能呢?
发布于 2019-03-08 11:30:07
我发现了Nathan关于为三叉戟设置并行性的风暴用户旧员额:
我建议使用" name“函数来命名流的各个部分,这样UI就可以显示哪些部分对应于哪些部分。 三叉戟将操作打包成尽可能少的螺栓。此外,它永远不会重新划分您的流,除非您已经完成了显式地涉及重新分区的操作(例如洗牌、groupBy、partitionBy、全局聚合等等)。三叉戟的这个属性确保您可以控制事物处理方式的排序/半排序。因此,在这种情况下,groupBy之前的所有东西都必须具有相同的并行性,否则三叉戟就必须重新划分流。既然你没说你想要流重新划分,它就不能这样做。通过引入重新分区操作,您可以获得喷口与每个流程的不同的并行性,如下所示: stream.parallelismHint(1).shuffle().each(…).each(…).parallelismHint(3).groupBy(…);
我认为您可能希望设置parallelismHint作为您的喷口以及您的.each。
关于同时处理多个批处理,您是正确的,这就是maxSpoutPending在三叉戟中的用途。试着在Storm中签入,您的max喷口挂起的值实际上已被取起。还尝试为MasterBatchCoordinator启用调试日志记录。您应该能够从日志记录中判断多个批是否同时运行。
当您说多个批没有并发处理时,您是指ProcessTask吗?请记住,三叉戟的一个属性是,状态更新是按批排序的。如果在运行中有maxSpoutPending=3和批处理1、2和3,三叉戟在写入批处理1之前不会发出更多批处理,此时它会再发出一批。因此,慢批可以阻止更多的释放,即使2和3被完全处理,它们也必须等待1完成并编写。
如果你不需要三叉戟的批次和订购行为,你可以尝试常规风暴。
更多的是附带说明,但是您可能需要考虑从storm-kafka迁移到storm-kafka-client。这对这个问题并不重要,但如果不这样做,您将无法升级到Kafka 2.x,而且在获得一组状态迁移之前会更容易。
https://stackoverflow.com/questions/55058793
复制相似问题