System.Threading.Channels允许我们指定一个容量和完全模式= DropOldest。基本上,当通道满了,消息被处理了10秒时,在这10秒内,它将丢弃新的记录。
var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(1)
{
FullMode = BoundedChannelFullMode.DropOldest
});有没有办法用Rx做这件事?
发布于 2022-06-03 08:03:07
Rx可观测值不具有“满”的特性。一个可观察的序列并不像Queue<T>或Channel<T>那样是消息的存储。它只是消息的生成器/传播者。一些Rx操作符有内部队列来执行他们的工作,例如Concat和Zip操作符。通常,这些队列是隐藏的,不能配置为“有损”。
可能具有您正在寻找的功能的Rx组件是ReplaySubject。这个组件可以配置它可以重播的最大消息数(int bufferSize),并且可以在丢弃它之前存储每条消息的最长时间(TimeSpan window)。如果您设置的是bufferSize,而不是window,那么ReplaySubject<T>最终会缓冲指定数量的项目,然后缓冲区将永远保持相同的大小。每条传入的消息都会导致删除最古老的缓冲消息。ReplaySubject<T>不是像Channel<T>那样的可消费队列。它总是准备好将其缓冲区中的所有消息传播到将来可能出现的任何新订阅服务器。
ReplaySubject<T>被Replay运算符用作传播器,类似于Publish运算符在内部由Subject<T>支持的方式。
发布于 2022-06-03 08:15:52
要添加到@的答案,频道可能变得“满”,因为你写在它们的一端,从另一端阅读。它们是独立的操作,缓冲区是一个定义的东西,您可以控制它的预期行为。
可观测数据应该立即传播任何通知--通常不存在任何保存区域。一些Rx操作符在处理过程中添加了一个,但是如果读取的东西和正在写入的东西之间的这种分离级别对您很重要,那么通道抽象可能更接近您的需要,而不是可观察的。
发布于 2022-06-03 09:23:18
您所描述的内容听起来类似于其他Rx实现(如RxJava)所称的“back pressure”。
这还没有在Rx.Net中实现,而且可能永远也不会实现。
https://stackoverflow.com/questions/72485994
复制相似问题