首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >当添加空处理器时,Kafka流处理速度减慢。

当添加空处理器时,Kafka流处理速度减慢。
EN

Stack Overflow用户
提问于 2019-02-04 16:44:51
回答 1查看 237关注 0票数 0

考虑一下这个Kafka流驱动程序。

代码语言:javascript
复制
public class TestDriver {

    private static final String SOURCE = "SOURCE";

    public static void main(String[] args) throws Exception {

        ProtoDeserializer<Message> protoDeserializer = new ProtoDeserializer<>(Message.parser());
        ProtoSerializer<Message> protoSerializer = new ProtoSerializer<>();

        StringDeserializer stringDerializer = new StringDeserializer();
        StringSerializer stringSerializer = new StringSerializer();

        Topology topologyBuilder = new Topology();
        topologyBuilder.addSource(SOURCE, stringDerializer, protoDeserializer, "input-messages")

            .addProcessor(DummyProcessor.NAME, DummyProcessor::new, SOURCE)

            .addSink("MAIN", "output-messages", stringSerializer, protoSerializer, DummyProcessor.NAME)
        ;

        KafkaStreams streams = new KafkaStreams(topologyBuilder, getConfig());
        streams.cleanUp();
        streams.start();

        System.out.println(streams.toString());

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

    }

    private static Properties getConfig() {
        Properties config = new Properties();
        config.put(StreamsConfig.CLIENT_ID_CONFIG, "test.stream-processor");
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test.stream-processor");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker-1:9092,broker-2:9092,broker-3:9092");
        config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
        config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 10);
        config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
        return config;
    }
}

问题是,当不向拓扑中添加处理器(不包括.addProcessor() )时,从源到接收器的处理速度是很好的(这意味着我目前产生25k条消息/s,它没有问题要跟上)。

但是,当添加DummyProcessor时,它突然处理3k消息/s(600 k字节)。

DummyProcessor基本上什么也不做:

代码语言:javascript
复制
public class DummyProcessor extends AbstractProcessor<String, Message> {

    public static final String NAME = "DUMMY_PROCESSOR";

    public void process(String key, Message originalMessage) {
        context().forward(key, originalMessage);
        context().commit();
    }
}

添加单个“空”处理器是否会增加流性能的开销?原因是什么?卡夫卡流是如此聪明吗?当没有处理器时,它不会执行原始服务器,只转发接收到的数据吗?总之要加快速度吗?

以这样的速度,我需要更多的cpu线程来处理我所有的数据,因为25k消息/s是我拥有的数据的1%。听起来挺多的。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-02-04 18:09:23

问题是由于经常请求提交而引起的。

您根本不需要打电话给ProcessorContext:commit()。基于commit.interval.ms属性的Kafka流执行提交(be default:30000 ms)。如果准确地设置了一次语义,则其不同的值。您可以在https://kafka.apache.org/documentation/#streamsconfigs中提供详细信息。

如果在某些用例中需要更频繁地提交,则可以调用ProcessorContext:commit()。但您必须记住,提交并不是立即(直接)完成的。它只在可能的情况下设置要提交的标志。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54520645

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档