在使用IAsyncEnumerable<T>时,有可能节流数据吗?我有一个数据流快速进入,我只对最后一个元素感兴趣,每N秒。
谢谢。
发布于 2022-06-16 04:32:42
有一个不同的端点返回最近的事件,而不是处理流,这似乎是有意义的。
如果您必须处理一个队列/流,您可以使用这些事件,并将每个新传入的事件分配给类似latest的内容,并在所需的间隔内读取该事件。
发布于 2022-06-16 04:53:52
一种解决方案是将IAsyncEnumerable<T>转换为IObservable<T>,并利用System.Reactive的力量。
首先,你需要一个转换器。我找不到内置的,所以我创造了我自己的
using System;
using System.Collections.Generic;
using System.Reactive.Subjects;
public class AsyncEnumerableToObservable<T> : IObservable<T>
{
private readonly IAsyncEnumerable<T> _source;
private readonly Subject<T> _subject = new();
public AsyncEnumerableToObservable(IAsyncEnumerable<T> source)
{
_source = source;
BeginConsume();
}
public IDisposable Subscribe(IObserver<T> observer) => _subject.Subscribe(observer);
private async void BeginConsume()
{
try
{
await foreach (var item in _source)
{
_subject.OnNext(item);
}
_subject.OnCompleted();
}
catch (Exception e)
{
_subject.OnError(e);
}
}
}
public static class AsyncEnumerableExtensions
{
public static IObservable<T> ToObservable<T>(this IAsyncEnumerable<T> source)
{
return new AsyncEnumerableToObservable<T>(source);
}
}使用此转换器,您可以使用myEnumerable.ToObservable()并使用Sample方法从System.Reactive进行节流。
static class Program
{
static async Task Main(string[] args)
{
IAsyncEnumerable<int> seq = CreateSeq();
seq.ToObservable().Sample(TimeSpan.FromSeconds(1)).Subscribe(Console.WriteLine);
await Task.Delay(TimeSpan.FromSeconds(10));
}
private static async IAsyncEnumerable<int> CreateSeq()
{
int i = 0;
while (true)
{
await Task.Yield();
i++;
yield return i;
}
}
}https://stackoverflow.com/questions/72640217
复制相似问题