首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用C# Rx实现Redis流

如何使用C# Rx实现Redis流
EN

Stack Overflow用户
提问于 2020-05-25 23:50:16
回答 3查看 3.3K关注 0票数 1

因为我找不到任何不使用循环获取流内容的实现,所以我开始实现一个,但是我面临着一些问题,也许你们中的一些人可以指出正确的地方。

该实现使用Pub/Sub和流:* log ->流通道* log:notification ->发布/sub* log:lastReadMessage ->的组合,其中包含流中的最后一个读取键。

出版商

代码语言:javascript
复制
        static async Task Main(string[] args)
        {
            var connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync("localhost");
            var redisDb =  connectionMultiplexer.GetDatabase(1);

            while(true)
            {
                var value =  new NameValueEntry[]
                {
                    new NameValueEntry("id", Guid.NewGuid().ToString()),
                    new NameValueEntry("timestamp", DateTime.UtcNow.ToString())
                };

                redisDb.StreamAdd("log", value);
                var publisher = connectionMultiplexer.GetSubscriber();
                publisher.Publish("log:notify", string.Empty, CommandFlags.None);
                await Task.Delay(TimeSpan.FromSeconds(1));
            }
        }

订阅者

代码语言:javascript
复制
        static async Task Main(string[] args)
        {
            var connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync("localhost");
            var redisDb =  connectionMultiplexer.GetDatabase(1);


            var observableStream =  CreateTaskFromStream(connectionMultiplexer, redisDb, "log")
                .Subscribe(x => {
                  Console.WriteLine(x);  
                });

            Console.ReadLine();
        }
代码语言:javascript
复制
        private static SemaphoreSlim taskFromStreamBlocker = new SemaphoreSlim(1);

        private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel)
        {
            var lastReadMessage = "0-0";

            var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
            if (string.IsNullOrEmpty(lastReadMessageData))
            {
                redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
            }
            else
            {
                lastReadMessage = lastReadMessageData;
            }


            return Observable.Create<string>(obs => 
            {
                var subscriber = connection.GetSubscriber();
                subscriber.Subscribe($"{channel}:notify", async (ch, msg) => 
                {
                    var locker = await taskFromStreamBlocker
                        .WaitAsync(0)
                        .ConfigureAwait(false);

                    if (!locker)
                    {
                        return;
                    }

                    var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);

                    foreach(var message in messages)
                    {
                        obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
                        lastReadMessage = message.Id;
                    }

                    redisDb.KeyDelete($"{channel}:lastReadMessage");
                    redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);

                    taskFromStreamBlocker.Release();
                });

                return Disposable.Create(() => subscriber.Unsubscribe(channel));
            });
        }

为什么是信号量?

因为我可以将很多消息添加到流中,所以我不希望o将相同的消息处理两次。

问题

  1. 如果我们在流中有未处理的消息,那么当我们启动时,我们如何处理没有来自Pub/Sub的事件--我们可以验证它是否是未处理的消息并处理它。如果在此期间向流中添加了一条新消息,而我们尚未订阅Pub/sub,则在通过Pub/Sub接收通知之前,订阅者将不会处理该消息。
  2. 信号量对于不处理同一消息两次很重要,但同时也是一种诅咒。在消息处理过程中,可以将另一个消息添加到流中。当这种情况发生时,订阅者将不会立即处理,但只有在下次通知时才会处理(此时将处理两条消息)。

你将如何实现这一点?是否只有Rx实现了Redis流?该解决方案不应该使用某种循环,并且具有内存效率。这个是可能的吗?

谨致问候

保罗·阿博伊姆·平托

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2020-05-26 17:56:13

这是另一种使用200 is时间的计时器的解决方案。

代码语言:javascript
复制
        private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel, CancellationToken cancellationToken)
        {
            var lastReadMessage = "0-0";

            var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
            if (string.IsNullOrEmpty(lastReadMessageData))
            {
                redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
            }
            else
            {
                lastReadMessage = lastReadMessageData;
            }

            var instance = ThreadPoolScheduler.Instance;

            return Observable.Create<string>(obs => 
            {
                var disposable = Observable
                    .Interval(TimeSpan.FromMilliseconds(200), instance)
                    .Subscribe(async _ => 
                    {
                        var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);

                        foreach(var message in messages)
                        {
                            obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
                            lastReadMessage = message.Id;
                        }

                        redisDb.KeyDelete($"{channel}:lastReadMessage");
                        redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
                    });
                cancellationToken.Register(() => disposable.Dispose());

                return Disposable.Empty;    
            });
       }
票数 0
EN

Stack Overflow用户

发布于 2020-05-29 02:27:03

我用一个紧的循环,只需做一个XRange,保存一个位置-吻。但是,如果没有工作,它就会退却,所以当有很多事情发生的时候,它就会很快地停下来。

但是,如果您需要更高的性能(如在处理过程中阅读),那么在大多数情况下,我会提醒您不要这样做。

  1. ,它创造了很多的复杂性,这需要是坚如磐石的。
  2. Redis通常足够快的
  3. “我不想让相同的消息被处理两次。”几乎每一个系统都至少有一次交付,消除这一问题会让人的思维困难/缓慢。您可以通过使用一组ids来部分删除它,但是对于消费者来说,处理它和设计成幂等的消息非常微不足道。这可能是消息设计问题的根本原因。如果对每个读取器进行分区(单独的流和每个流的一个工作人员),则可以将哈希集保存在内存中,避免缩放/分布式问题。注意,Redis流可以保持顺序,使用它来生成更简单的幂等信息。
  4. 异常,您不想停止处理流,因为用户在1条消息上有一个逻辑异常(例如在整个系统停止的夜间接到呼叫),锁使情况更糟。事件数据不能改变其发生的情况,所以尽最大的努力。但是,下面的/ redis异常确实需要抛出并重新测试。在回路外管理这是非常painful.
  5. Simple的背压。如果您不能足够快地处理工作,循环就会减慢,而不是创建大量任务并炸掉所有内存。

我不再使用分布式锁/信号量了。

如果您处理的命令(如剂量,而不是xyz )已经发生,这些操作可能会失败。同样,消费者应该处理已经发生的情况,而不是红/流读取部分。

一些具有神奇回调功能的libs无法解决这些问题,当超时在任何节点上运行时,回调将重新尝试。复杂性/问题仍然存在,它们只是转移到其他地方。

对于消费者来说,您可能在上面有一个可以观察到的,但是这基本上是表面的,它不能解决问题,如果您在许多实现下看到相同的循环。我不会使用这个,而是让使用者注册一个操作。

例如

代码语言:javascript
复制
    public interface IStreamSubscriber
    {
        void RegisterEventCallBack(Func<object, IReadOnlyDictionary<string, string>, Task> callback);
        void RegisterBatchEventCallBack(Func<IEnumerable<(object msg, IReadOnlyDictionary<string, string> metaData)>, Task> batchCallback);
        void Start();
    }    

在您的例子中,回调可能有可观察的,而不是使用循环,但是下面有一个低级别的循环,它还可以为使用者执行消息到对象的转换。

票数 1
EN

Stack Overflow用户

发布于 2020-05-26 16:31:56

这是我想要避免的解决方案

代码语言:javascript
复制
        private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel, CancellationToken cancellationToken)
        {
            var lastReadMessage = "0-0";

            var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
            if (string.IsNullOrEmpty(lastReadMessageData))
            {
                redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
            }
            else
            {
                lastReadMessage = lastReadMessageData;
            }

            return Observable.Create<string>(async obs => 
            {
                while(!cancellationToken.IsCancellationRequested)
                {
                    var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);

                    foreach(var message in messages)
                    {
                        obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
                        lastReadMessage = message.Id;
                    }

                    redisDb.KeyDelete($"{channel}:lastReadMessage");
                    redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);

                    await Task.Delay(TimeSpan.FromMilliseconds(500));
                }

                return Disposable.Empty;
            });
        }
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62012495

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档