我的服务通过10个消费者提供一个主题。在将一批新消息添加到群集主题后,非托管内存将立即增长(图1)。即使经历了很长一段时间的不活动,记忆仍在增长。

当我向主题发送500 k消息并启动服务时,我看到了以下内容:

当更改参数时,我确定这是由于使用者的本地队列所致:
QueuedMinMessages -每个topic+partition库试图在本地消费者队列中维护的最小消息数。(缺省值: 100000;我的值: 100)
QueuedMaxMessagesKbytes -本地使用者队列中排队预取消息的最大千字节数。如果使用高级使用者,则此设置适用于单个使用者队列,而不管分区的数目如何。当使用遗留的简单使用者或使用单独的分区队列时,此设置适用于每个分区。此值可能会被fetch.message.max.bytes过高。此属性比queued.min.messages具有更高的优先级。(违约率: 65536;我的价值: 30000)
在更改这些参数并重新启动服务(500 k消息保留在主题中)之后:

降低这些参数的值只会增加内存的填充时间,但不能解决泄漏问题。由于某种原因,本地kafka队列没有清除处理过的消息。
Сonsumer代码:
private async Task StartConsumer(CancellationToken stoppingToken)
{
try
{
using (var consumer = new ConsumerBuilder<string, string>(_consumerConfig)
.SetErrorHandler((_, e) => _logger.LogError($"Error: {e.Reason}"))
.Build())
{
consumer.Subscribe(_topicName);
while (!stoppingToken.IsCancellationRequested)
{
ConsumeResult<string, string> result = null;
try
{
result = consumer.Consume();
if (result == null) continue;
var message = result.Message.Value;
Console.WriteLine($"Consumed message '{message}' at '{result.TopicPartitionOffset}'");
if (message != null)
{
T deserializedMessage = JsonConvert.DeserializeObject<T>(message);
if (deserializedMessage != null)
{
var handler = await _managerFactory.CreateHandler(_topicName);
await handler.HandleAsync(deserializedMessage, _topicName);
}
}
else
{
_logger.LogInformation("Processed empty message from Kafka");
}
_logger.LogInformation($"Processed message from Kafka");
consumer.Commit(result);
}
catch (OracleException ex)
{
_logger.LogError(ex, "OracleException" + '\n' + ex.Message + '\n' + ex.InnerException);
ProcessFailureMessage(result.Message);
}
catch (ConsumeException ex)
{
_logger.LogError(ex, "ConsumerException" + '\n' + ex.Message + '\n' + ex.InnerException);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Exception" + '\n' + ex.Message + '\n' + ex.InnerException);
}
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Kafka connection error");
}
}使用者配置:
"RequestTimeoutMs": 60000,
"TransactionTimeoutMs": 300000,
"SessionTimeoutMs": 300000,
"EnableAutoCommit": false,
"QueuedMinMessages": 100,
"QueuedMaxMessagesKbytes": 30000,
"AutoOffsetReset": "Earliest",
"AllowAutoCreateTopics": true,
"PartitionAssignmentStrategy": "RoundRobin"confluent-kafka-dotnet版本1.9.3
UPD 1:像长时间运行的任务一样调用StartConsumer():
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
for (int i = 0; i < _consumersCount; i++)
{
Task.Factory.StartNew(() => StartConsumer(stoppingToken),
stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
return Task.CompletedTask;
}发布于 2022-09-28 19:24:37
签出这个库,看看问题是否仍然存在:https://github.com/soucore/Reactive.Kafka.Client
https://stackoverflow.com/questions/73823315
复制相似问题