首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafka消费者本地批处理队列内存泄漏

Kafka消费者本地批处理队列内存泄漏
EN

Stack Overflow用户
提问于 2022-09-23 05:21:29
回答 1查看 187关注 0票数 0

我的服务通过10个消费者提供一个主题。在将一批新消息添加到群集主题后,非托管内存将立即增长(图1)。即使经历了很长一段时间的不活动,记忆仍在增长。

当我向主题发送500 k消息并启动服务时,我看到了以下内容:

当更改参数时,我确定这是由于使用者的本地队列所致:

QueuedMinMessages -每个topic+partition库试图在本地消费者队列中维护的最小消息数。(缺省值: 100000;我的值: 100)

QueuedMaxMessagesKbytes -本地使用者队列中排队预取消息的最大千字节数。如果使用高级使用者,则此设置适用于单个使用者队列,而不管分区的数目如何。当使用遗留的简单使用者或使用单独的分区队列时,此设置适用于每个分区。此值可能会被fetch.message.max.bytes过高。此属性比queued.min.messages具有更高的优先级。(违约率: 65536;我的价值: 30000)

在更改这些参数并重新启动服务(500 k消息保留在主题中)之后:

降低这些参数的值只会增加内存的填充时间,但不能解决泄漏问题。由于某种原因,本地kafka队列没有清除处理过的消息。

Сonsumer代码:

代码语言:javascript
复制
private async Task StartConsumer(CancellationToken stoppingToken)
    {
        try
        {
            using (var consumer = new ConsumerBuilder<string, string>(_consumerConfig)
                       .SetErrorHandler((_, e) => _logger.LogError($"Error: {e.Reason}"))
                       .Build())
            {
                
                consumer.Subscribe(_topicName);
                while (!stoppingToken.IsCancellationRequested)
                {
                    ConsumeResult<string, string> result = null;
                    try
                    {
                        result = consumer.Consume();
                        if (result == null) continue;
                        var message = result.Message.Value;
                            Console.WriteLine($"Consumed message '{message}' at '{result.TopicPartitionOffset}'");
                            if (message != null)
                            {
                                T deserializedMessage = JsonConvert.DeserializeObject<T>(message);
                                if (deserializedMessage != null)
                                {
                                    var handler = await _managerFactory.CreateHandler(_topicName);
                                    await handler.HandleAsync(deserializedMessage, _topicName);
                                }
                            }
                            else
                            {
                                _logger.LogInformation("Processed empty message from Kafka");
                            }
                            _logger.LogInformation($"Processed message from Kafka");
                            consumer.Commit(result);
                    }
                    catch (OracleException ex)
                    {
                        _logger.LogError(ex, "OracleException" + '\n' + ex.Message + '\n' + ex.InnerException);
                        ProcessFailureMessage(result.Message);
                    }
                    catch (ConsumeException ex)
                    {
                        _logger.LogError(ex, "ConsumerException" + '\n' + ex.Message + '\n' + ex.InnerException);
                    }
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError(ex, "Exception" + '\n' + ex.Message + '\n' + ex.InnerException);
                    }
                }

            }
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Kafka connection error");
        }

    }

使用者配置:

代码语言:javascript
复制
"RequestTimeoutMs": 60000,
"TransactionTimeoutMs": 300000,
"SessionTimeoutMs": 300000,
"EnableAutoCommit": false,
"QueuedMinMessages": 100,
"QueuedMaxMessagesKbytes": 30000,
"AutoOffsetReset": "Earliest",
"AllowAutoCreateTopics": true,
"PartitionAssignmentStrategy": "RoundRobin"

confluent-kafka-dotnet版本1.9.3

UPD 1:像长时间运行的任务一样调用StartConsumer():

代码语言:javascript
复制
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
    for (int i = 0; i < _consumersCount; i++)
    {
        Task.Factory.StartNew(() => StartConsumer(stoppingToken),
            stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
    }

    return Task.CompletedTask;
}
EN

回答 1

Stack Overflow用户

发布于 2022-09-28 19:24:37

签出这个库,看看问题是否仍然存在:https://github.com/soucore/Reactive.Kafka.Client

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73823315

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档