我正在尝试使用RxJS throttleTime实现等效的Rx.NET (它不是在.NET中实现的)。
throttleTime从可观察的源发出一个值,然后忽略后续的持续时间为毫秒的源值,然后重复此过程。这与throttle不同,后者在发射之前等待一段时间。throttleTime发出第一个项目,然后等待再次发出。
任何关于如何解决这一问题的建议都将不胜感激。
发布于 2017-11-23 06:50:13
这应该可以做到:
public static IObservable<T> ThrottleTime<T>(this IObservable<T> source, TimeSpan ts)
{
return ThrottleTime(source, ts, Scheduler.Default);
}
public static IObservable<T> ThrottleTime<T>(this IObservable<T> source, TimeSpan ts, IScheduler scheduler)
{
return source
.Timestamp(scheduler)
.Scan((EmitValue: false, OpenTime: DateTimeOffset.MinValue, Item: default(T)), (state, item) => item.Timestamp > state.OpenTime
? (true, item.Timestamp + ts, item.Value)
: (false, state.OpenTime, item.Value)
)
.Where(t => t.EmitValue)
.Select(t => t.Item);
}说明:把ThrottleTime想象成有一个单一的状态变量:下次门打开到一个新的值时。如果源项在这个时间门值之前,那么什么都不会发生.如果源项在后面,则传递它,并将门值重置为最新的时间戳。
Scan以元组( OpenTime变量)保存时间值。元组上的其他字段在下游更有帮助。
https://stackoverflow.com/questions/47446745
复制相似问题