首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >条件delay+throttle算子

条件delay+throttle算子
EN

Stack Overflow用户
提问于 2021-05-02 20:32:50
回答 2查看 204关注 0票数 0

我正在编写一个自定义RX运算符,它结合了Throttle和Delay的特性,并具有以下签名

代码语言:javascript
复制
public static IObservable<T> DelayWhen(this IObservable<T> self, TimeSpan delay, Func<T, bool> condition);

这些规则如下:

如果immediately.

  • If

  • 返回false,发出condition(t)返回true,延迟为delay时间。

  • 如果self在延迟期间发出一个值,则执行以下操作:g 113H 114如果condition(t)返回false,取消/跳过计划延迟发射的值,如果delay).

返回true,则发出新的值

  • ,然后跳过/忽略这个新值(即,如果self

  • 期间不发出任何值,则延迟值将发出)。

从规则中可以看出,这里有一些让人想起节流的行为。

我解决这个问题的各种尝试包括一些刚刚发展到复杂的async方法。我真的觉得用现有的运算符应该可以解决这个问题。参见https://stackoverflow.com/a/16290788/2149075,它非常巧妙地使用了Amb,我觉得它非常接近我想要实现的目标。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2021-05-03 13:12:44

问题并不完全清楚,所以使用下面的测试用例作为场景:

代码语言:javascript
复制
Observable.Interval(TimeSpan.FromSeconds(1))
    .Take(10)
    .DelayWhen(TimeSpan.FromSeconds(1.5), i => i % 3 == 0 || i % 2 == 0)

这应产生以下结果:

代码语言:javascript
复制
//        T: ---1---2---3---4---5---6---7---8---9---0---1----
// original: ---0---1---2---3---4---5---6---7---8---9
//   delay?: ---Y---N---Y---Y---Y---N---Y---N---Y---Y
// expected: -------1---------2-----5-------7-------------8
//
// 0: Delayed, but interrupted by 1, 
// 1: Non-delayed, emit immediately
// 2: Delayed, emit after 1.5 seconds
// 3: Delayed, since emitted during a delay, ignored
// 4: Delayed, but interrupted by 5.
// 5: Non-delayed, emit immediately
// 6: Delayed, but interrupted by 7.
// 7: Non-delayed, emit immediately
// 8: Delayed, but interrupted by 9
// 9: Delayed, since emitted during a delay, ignored

如果这不符合要求,请澄清问题。@Theodore的解决方案获得了正确的时间安排,但将发射3和9,忽略了“取消/跳过延迟排放的排定值并释放新值”子句。

这在功能上相当于Theodore的代码,但(IMO)更易于使用和理解:

代码语言:javascript
复制
public static IObservable<T> DelayWhen2<T>(this IObservable<T> source, TimeSpan delay, Func<T, bool> condition, IScheduler scheduler)
{
    return source
        .Select(x => (Item: x, WithDelay: condition(x)))
        .Publish(published => published
            .SelectMany(t => t.WithDelay 
                ? Observable.Return(t)
                    .Delay(delay, scheduler)
                    .TakeUntil(published.Where(t2 => !t2.WithDelay))
                : Observable.Return(t)
            )
        )
        .Select(e => e.Item);
}

从那时起,我必须嵌入您是否延迟使用.Scan的状态

代码语言:javascript
复制
public static IObservable<T> DelayWhen3<T>(this IObservable<T> source, TimeSpan delay, Func<T, bool> condition)
{
    return DelayWhen3(source, delay, condition, Scheduler.Default);
}

public static IObservable<T> DelayWhen3<T>(this IObservable<T> source, TimeSpan delay, Func<T, bool> condition, IScheduler scheduler)
{
    return source
        .Select(x => (Item: x, WithDelay: condition(x)))
        .Publish(published => published
            .Timestamp(scheduler)
            .Scan((delayOverTime: DateTimeOffset.MinValue, output: Observable.Empty<T>()), (state, t) => {
                if(!t.Value.WithDelay)  
                    //value isn't delayed, current delay status irrelevant, emit immediately, and cancel previous delay.
                    return (DateTimeOffset.MinValue, Observable.Return(t.Value.Item));
                else
                    if (state.delayOverTime > t.Timestamp)
                        //value should be delayed, but current delay already in progress. Ignore value.
                        return (state.delayOverTime, Observable.Empty<T>());
                    else
                        //value should be delayed, no delay in progress. Set delay state, and return delayed observable.
                        return (t.Timestamp + delay, Observable.Return(t.Value.Item).Delay(delay, scheduler).TakeUntil(published.Where(t2 => !t2.WithDelay)));
            })
        )
        .SelectMany(t => t.output);
}

.Scan运算符中,嵌入上一个Delay过期的时间。通过这种方式,您知道可以处理应该在现有延迟内延迟的值。我在时间敏感函数中添加了scheduler参数以启用测试:

代码语言:javascript
复制
var ts = new TestScheduler();

var target = Observable.Interval(TimeSpan.FromSeconds(1), ts)
    .Take(10)
    .DelayWhen3(TimeSpan.FromSeconds(1.5), i => i % 3 == 0 || i % 2 == 0, ts);

var observer = ts.CreateObserver<long>();
target.Subscribe(observer);
ts.Start();

var expected = new List<Recorded<Notification<long>>> {
    new Recorded<Notification<long>>(2000.MsTicks(), Notification.CreateOnNext<long>(1)),
    new Recorded<Notification<long>>(4500.MsTicks(), Notification.CreateOnNext<long>(2)),
    new Recorded<Notification<long>>(6000.MsTicks(), Notification.CreateOnNext<long>(5)),
    new Recorded<Notification<long>>(8000.MsTicks(), Notification.CreateOnNext<long>(7)),
    new Recorded<Notification<long>>(10500.MsTicks(), Notification.CreateOnNext<long>(8)),
    new Recorded<Notification<long>>(10500.MsTicks() + 1, Notification.CreateOnCompleted<long>()),
};

ReactiveAssert.AreElementsEqual(expected, observer.Messages);

以及MsTicks的代码:

代码语言:javascript
复制
public static long MsTicks(this int i)
{
    return TimeSpan.FromMilliseconds(i).Ticks;
}
票数 3
EN

Stack Overflow用户

发布于 2021-05-03 04:41:27

下面是DelayWhen操作符的一个实现,它基于内置的Window运算符:

更新:最初的实现(Revision 1)不能满足问题的要求,所以我用一个定制的延迟/节流操作符替换了Delay操作符。

代码语言:javascript
复制
/// <summary>
/// Either delays the emission of the elements that satisfy the condition, by the
/// specified time duration, or ignores them, in case they are produced before
/// the emission of previously delayed element. Elements that don't satisfy the
/// condition are emitted immediately, and they also cancel any pending emission of
/// all previously delayed elements.
/// </summary>
public static IObservable<T> DelayWhen<T>(this IObservable<T> source,
    TimeSpan delay, Func<T, bool> condition, IScheduler scheduler = null)
{
    // Arguments validation omitted
    scheduler ??= DefaultScheduler.Instance;
    return source
        .Select(x => (Item: x, WithDelay: condition(x)))
        .Publish(published => published.Window(published.Where(e => !e.WithDelay)))
        .Select(w => Observable.Merge(
            DelayThrottleSpecial(w.Where(e => e.WithDelay), delay, scheduler),
            w.Where(e => !e.WithDelay)
        ))
        .Switch()
        .Select(e => e.Item);

    /// <summary>
    /// Time shifts the observable sequence by the specified time duration, ignoring
    /// elements that are produced while a previous element is scheduled for emission.
    /// </summary>
    static IObservable<T2> DelayThrottleSpecial<T2>(IObservable<T2> source,
        TimeSpan dueTime, IScheduler scheduler)
    {
        int mutex = 0; // 0: not acquired, 1: acquired
        return source.SelectMany(x =>
        {
            if (Interlocked.CompareExchange(ref mutex, 1, 0) == 0)
                return Observable.Return(x)
                    .DelaySubscription(dueTime, scheduler)
                    .Finally(() => Volatile.Write(ref mutex, 0));
            return Observable.Empty<T2>();
        });
    }
}

源序列在连续的窗口(子序列)中进行分区,每个窗口以一个false (非延迟)元素结尾。然后将每个窗口投影到一个新窗口,该窗口根据需求延迟/节流其true (延迟)元素。最后,通过使用Switch操作符将投影窗口合并回单个序列,以便每次发出新窗口时都丢弃窗口中所有挂起的元素。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67360865

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档