首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Pulsar客户端线程平衡

Pulsar客户端线程平衡
EN

Stack Overflow用户
提问于 2021-11-08 20:42:30
回答 1查看 77关注 0票数 1

我正在尝试实现一个具有多个生产者的Pulsar客户端,它在线程之间分配负载,但是不管在ioThreads()和listenerThreads()上传递的值是什么,它总是重载第一个线程(> 65% cpu,而其他线程完全空闲)。

我已经尝试了一些方法,包括每小时进行一次“动态重新平衡”(最后一种方法),但在过程中关闭它肯定不是最好的方法。

这是相关的代码

代码语言:javascript
复制
...
// pulsar client
pulsarClient = PulsarClient.builder() //
                           .operationTimeout(config.getAppPulsarTimeout(), TimeUnit.SECONDS) //
                           .ioThreads(config.getAppPulsarClientThreads()) //
                           .listenerThreads(config.getAppPulsarClientThreads()) //
                           .serviceUrl(config.getPulsarServiceUrl()).build();
...

private createProducers() {
    String strConsumerTopic = this.config.getPulsarTopicInput();
    List<Integer> protCasesList = this.config.getEventProtoCaseList();

    for (Integer e : protCasesList) {
        String topicName = config.getPulsarTopicOutput().concat(String.valueOf(e));
        LOG.info("Creating producer for topic: {}", topicName);

        Producer<byte[]> protobufProducer = pulsarClient.newProducer().topic(topicName).enableBatching(false)
                .blockIfQueueFull(true).compressionType(CompressionType.NONE)
                .sendTimeout(config.getPulsarSendTimeout(), TimeUnit.SECONDS)
                .maxPendingMessages(config.getPulsarMaxPendingMessages()).create();

        this.mapLink.put(strConsumerTopic.concat(String.valueOf(e)), protobufProducer);
    }
}

public void closeProducers() {
    String strConsumerTopic = this.config.getPulsarTopicInput();
    List<Integer> protCasesList = this.config.getEventProtoCaseList();

    for (Integer e : protCasesList) {
        try {
            this.mapLink.get(strConsumerTopic.concat(String.valueOf(e))).close();
            LOG.info("{} producer correctly closed...",
                    this.mapLink.get(strConsumerTopic.concat(String.valueOf(e))).getProducerName());
        } catch (PulsarClientException e1) {
            LOG.error("Producer: {} not closed cause: {}",
                    this.mapLink.get(strConsumerTopic.concat(String.valueOf(e))).getProducerName(),
                    e1.getMessage());
        }
    }
}

public void rebalancePulsarThreads(boolean firstRun) {
    ThreadMXBean threadHandler = ManagementFactory.getThreadMXBean();
    ThreadInfo[] threadsInfo = threadHandler.getThreadInfo(threadHandler.getAllThreadIds());
    for (ThreadInfo threadInfo : threadsInfo) {
        if (threadInfo.getThreadName().contains("pulsar-client-io")) {
            // enable cpu time for all threads
            threadHandler.setThreadCpuTimeEnabled(true);
            // get cpu time for this specific thread
            long threadCPUTime = threadHandler.getThreadCpuTime(threadInfo.getThreadId());
            int thresholdCPUTime = 65;
            if (threadCPUTime > thresholdCPUTime) {
                LOG.warn("Pulsar client thread with CPU time greater than {}% - REBALANCING now", thresholdCPUTime);
                try {
                    closeProducers();

                } catch (Exception e) {
                    if (!firstRun) {
                        // producers will not be available in the first run
                        // therefore, the logging only happens when it is not the first run
                        LOG.warn("Unable to close Pulsar client threads on rebalancing: {}", e.getMessage());
                    }
                }

                try {
                    createPulsarProducers();

                } catch (Exception e) {
                    LOG.warn("Unable to create Pulsar client threads on rebalancing: {}", e.getMessage());
                }
            }
        }
    }
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-11-09 16:27:13

根据您的描述,最有可能的情况是,您正在使用的所有主题都由一个代理提供服务。

如果确实如此,并避免跨代理的主题负载平衡,它使用单个线程是正常的,因为所有这些生产者将共享单个池化的TCP连接,并且每个连接被分配给一个IO线程(侦听器线程用于消费者侦听器)。

如果您想强制更多的线程,您可以增加“每个代理的最大TCP连接”设置,以便使用所有已配置的IO线程。

例如:

代码语言:javascript
复制
PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650")
    .ioThreads(16)
    .connectionsPerBroker(16)
    .create();
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69889739

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档