我通过在BackgroundService Core2.2上使用.NET实现了一个作为控制台应用的卡夫卡消费者。我使用Content-Kafka-DotNetv1.0.1.1作为Apache的客户端。我不太清楚如何处理每一条信息。
我知道有火,但我不确定我是否需要用它。如果我目前的做法是完全错误的,请给我一些建议。
下面是ConsumerService的实现:
public class ConsumerService : BackgroundService
{
private readonly IConfiguration _config;
private readonly IElasticLogger _logger;
private readonly ConsumerConfig _consumerConfig;
private readonly string[] _topics;
private readonly double _maxNumAttempts;
private readonly double _retryIntervalInSec;
public ConsumerService(IConfiguration config, IElasticLogger logger)
{
_config = config;
_logger = logger;
_consumerConfig = new ConsumerConfig
{
BootstrapServers = _config.GetValue<string>("Kafka:BootstrapServers"),
GroupId = _config.GetValue<string>("Kafka:GroupId"),
EnableAutoCommit = _config.GetValue<bool>("Kafka:Consumer:EnableAutoCommit"),
AutoOffsetReset = (AutoOffsetReset)_config.GetValue<int>("Kafka:Consumer:AutoOffsetReset")
};
_topics = _config.GetValue<string>("Kafka:Consumer:Topics").Split(',');
_maxNumAttempts = _config.GetValue<double>("App:MaxNumAttempts");
_retryIntervalInSec = _config.GetValue<double>("App:RetryIntervalInSec");
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
Console.WriteLine("!!! CONSUMER STARTED !!!\n");
// Starting a new Task here because Consume() method is synchronous
var task = Task.Run(() => ProcessQueue(stoppingToken), stoppingToken);
return task;
}
private void ProcessQueue(CancellationToken stoppingToken)
{
using (var consumer = new ConsumerBuilder<Ignore, Request>(_consumerConfig).SetValueDeserializer(new MessageDeserializer()).Build())
{
consumer.Subscribe(_topics);
try
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
var consumeResult = consumer.Consume(stoppingToken);
// Don't want to block consume loop, so starting new Task for each message
Task.Run(async () =>
{
var currentNumAttempts = 0;
var committed = false;
var response = new Response();
while (currentNumAttempts < _maxNumAttempts)
{
currentNumAttempts++;
// SendDataAsync is a method that sends http request to some end-points
response = await Helper.SendDataAsync(consumeResult.Value, _config, _logger);
if (response != null && response.Code >= 0)
{
try
{
consumer.Commit(consumeResult);
committed = true;
break;
}
catch (KafkaException ex)
{
// log
}
}
else
{
// log
}
if (currentNumAttempts < _maxNumAttempts)
{
// Delay between tries
await Task.Delay(TimeSpan.FromSeconds(_retryIntervalInSec));
}
}
if (!committed)
{
try
{
consumer.Commit(consumeResult);
}
catch (KafkaException ex)
{
// log
}
}
}, stoppingToken);
}
catch (ConsumeException ex)
{
// log
}
}
}
catch (OperationCanceledException ex)
{
// log
consumer.Close();
}
}
}
}发布于 2021-01-18 15:52:27
同意Fabio的观点,您不应该使用Task.Run来处理消息,因为最终会有大量线程浪费资源并切换它们的执行,从而导致性能下降。
此外,在同一个线程中处理所消耗的消息也是可以的,因为Kafka使用了拉模型,并且您的应用程序可以以自己的速度处理消息。
关于处理消息不止一次,我建议存储已处理消息的偏移量,以跳过已经处理的消息。由于偏移量是一个基于长的数字,所以您可以很容易地跳过偏移量小于之前提交的消息。当然,只有当您有一个分区时,这才能工作,因为Kafka在分区级别上提供了偏移计数器和顺序保证。
你可以在我的文章中找到卡夫卡消费者的例子。如果你有问题,请随便问,我很乐意帮助你
https://stackoverflow.com/questions/56733810
复制相似问题