首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在.NET核心上正确实现卡夫卡消费者后台服务

如何在.NET核心上正确实现卡夫卡消费者后台服务
EN

Stack Overflow用户
提问于 2019-06-24 09:39:25
回答 1查看 8K关注 0票数 6

我通过在BackgroundService Core2.2上使用.NET实现了一个作为控制台应用的卡夫卡消费者。我使用Content-Kafka-DotNetv1.0.1.1作为Apache的客户端。我不太清楚如何处理每一条信息。

  1. 由于每条消息的处理可能需要一些时间(最多24小时),所以我将为每条消息启动一个新的任务,这样我就不会阻止使用者使用新的消息。我认为,如果我有太多的信息,创建一个新的任务,每次不是正确的方式。那么,处理每条信息的正确方法是什么?是否可以为每条消息创建某种动态的后台服务?
  2. 如果已经处理了一条消息,但应用程序崩溃或重新平衡发生,那么我将不止一次地使用和处理相同的消息。我应该自动提交偏移量(或者在它被消耗后)并将消息(或任务)的状态存储在某个地方,比如在数据库中?

我知道有火,但我不确定我是否需要用它。如果我目前的做法是完全错误的,请给我一些建议。

下面是ConsumerService的实现:

代码语言:javascript
复制
public class ConsumerService : BackgroundService
{
    private readonly IConfiguration _config;
    private readonly IElasticLogger _logger;
    private readonly ConsumerConfig _consumerConfig;
    private readonly string[] _topics;
    private readonly double _maxNumAttempts;
    private readonly double _retryIntervalInSec;

    public ConsumerService(IConfiguration config, IElasticLogger logger)
    {
        _config = config;
        _logger = logger;
        _consumerConfig = new ConsumerConfig
        {
            BootstrapServers = _config.GetValue<string>("Kafka:BootstrapServers"),
            GroupId = _config.GetValue<string>("Kafka:GroupId"),
            EnableAutoCommit = _config.GetValue<bool>("Kafka:Consumer:EnableAutoCommit"),
            AutoOffsetReset = (AutoOffsetReset)_config.GetValue<int>("Kafka:Consumer:AutoOffsetReset")
        };
        _topics = _config.GetValue<string>("Kafka:Consumer:Topics").Split(',');
        _maxNumAttempts = _config.GetValue<double>("App:MaxNumAttempts");
        _retryIntervalInSec = _config.GetValue<double>("App:RetryIntervalInSec");
    }

    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        Console.WriteLine("!!! CONSUMER STARTED !!!\n");
        
        // Starting a new Task here because Consume() method is synchronous
        var task = Task.Run(() => ProcessQueue(stoppingToken), stoppingToken);

        return task;
    }

    private void ProcessQueue(CancellationToken stoppingToken)
    {
        using (var consumer = new ConsumerBuilder<Ignore, Request>(_consumerConfig).SetValueDeserializer(new MessageDeserializer()).Build())
        {
            consumer.Subscribe(_topics);

            try
            {
                while (!stoppingToken.IsCancellationRequested)
                {
                    try
                    {
                        var consumeResult = consumer.Consume(stoppingToken);

                        // Don't want to block consume loop, so starting new Task for each message  
                        Task.Run(async () =>
                        {
                            var currentNumAttempts = 0;
                            var committed = false;

                            var response = new Response();

                            while (currentNumAttempts < _maxNumAttempts)
                            {
                                currentNumAttempts++;

                                // SendDataAsync is a method that sends http request to some end-points
                                response = await Helper.SendDataAsync(consumeResult.Value, _config, _logger);

                                if (response != null && response.Code >= 0)
                                {
                                    try
                                    {
                                        consumer.Commit(consumeResult);
                                        committed = true;
                                        
                                        break;
                                    }
                                    catch (KafkaException ex)
                                    {
                                        // log
                                    }
                                }
                                else
                                {
                                    // log
                                }
                                
                                if (currentNumAttempts < _maxNumAttempts)
                                {
                                    // Delay between tries
                                    await Task.Delay(TimeSpan.FromSeconds(_retryIntervalInSec));
                                }
                            }
                                                    
                            if (!committed)
                            {
                                try
                                {
                                    consumer.Commit(consumeResult);
                                }
                                catch (KafkaException ex)
                                {
                                    // log
                                }
                            }
                        }, stoppingToken);
                    }
                    catch (ConsumeException ex)
                    {
                        // log
                    }
                }
            }
            catch (OperationCanceledException ex)
            {
                // log
                consumer.Close();
            }
        }
    }
}
EN

回答 1

Stack Overflow用户

发布于 2021-01-18 15:52:27

同意Fabio的观点,您不应该使用Task.Run来处理消息,因为最终会有大量线程浪费资源并切换它们的执行,从而导致性能下降。

此外,在同一个线程中处理所消耗的消息也是可以的,因为Kafka使用了拉模型,并且您的应用程序可以以自己的速度处理消息。

关于处理消息不止一次,我建议存储已处理消息的偏移量,以跳过已经处理的消息。由于偏移量是一个基于长的数字,所以您可以很容易地跳过偏移量小于之前提交的消息。当然,只有当您有一个分区时,这才能工作,因为Kafka在分区级别上提供了偏移计数器和顺序保证。

你可以在我的文章中找到卡夫卡消费者的例子。如果你有问题,请随便问,我很乐意帮助你

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56733810

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档