首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Rx.NET "gate“操作符

Rx.NET "gate“操作符
EN

Stack Overflow用户
提问于 2018-06-03 21:21:18
回答 2查看 155关注 0票数 2

注意:如果重要的话,我使用的是3.1。此外,我已经在codereview上问过这个问题,但到目前为止还没有回应。

我需要一个操作符来允许一个布尔流作为另一个流的门(当门流为真时,让值通过,当门流为假时,丢弃它们)。我通常会使用Switch,但如果源流是冷的,它会不断地重新创建它,这是我不想要的。

我还想自己清理一下,这样如果源或门都完成了,结果就会完成。

代码语言:javascript
复制
public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate)
{
    var s = source.Publish().RefCount();
    var g = gate.Publish().RefCount();

    var sourceCompleted = s.TakeLast(1).DefaultIfEmpty().Select(_ => Unit.Default);
    var gateCompleted = g.TakeLast(1).DefaultIfEmpty().Select(_ => Unit.Default);

    var anyCompleted = Observable.Amb(sourceCompleted, gateCompleted);

    var flag = false;
    g.TakeUntil(anyCompleted).Subscribe(value => flag = value);

    return s.Where(_ => flag).TakeUntil(anyCompleted);
}

除了总体的冗长之外,我不喜欢我订阅gate,即使结果从来没有订阅过(在这种情况下,这个操作符应该是一个no-op)。有没有办法摆脱那些订阅者?

我也尝试过这个实现,但当涉及到自我清理时,它甚至更糟糕:

代码语言:javascript
复制
return Observable.Create<T>(
    o =>
    {
        var flag = false;
        gate.Subscribe(value => flag = value);

        return source.Subscribe(
            value =>
            {
                if (flag) o.OnNext(value);
            });
    });

以下是我用来检查实现的测试:

代码语言:javascript
复制
[TestMethod]
public void TestMethod1()
{
    var output = new List<int>();

    var source = new Subject<int>();
    var gate = new Subject<bool>();

    var result = source.When(gate);
    result.Subscribe(output.Add, () => output.Add(-1));

    // the gate starts with false, so the source events are ignored
    source.OnNext(1);
    source.OnNext(2);
    source.OnNext(3);
    CollectionAssert.AreEqual(new int[0], output);

    // setting the gate to true will let the source events pass
    gate.OnNext(true);
    source.OnNext(4);
    CollectionAssert.AreEqual(new[] { 4 }, output);
    source.OnNext(5);
    CollectionAssert.AreEqual(new[] { 4, 5 }, output);

    // setting the gate to false stops source events from propagating again
    gate.OnNext(false);
    source.OnNext(6);
    source.OnNext(7);
    CollectionAssert.AreEqual(new[] { 4, 5 }, output);

    // completing the source also completes the result
    source.OnCompleted();
    CollectionAssert.AreEqual(new[] { 4, 5, -1 }, output);
}

[TestMethod]
public void TestMethod2()
{
    // completing the gate also completes the result
    var output = new List<int>();

    var source = new Subject<int>();
    var gate = new Subject<bool>();

    var result = source.When(gate);
    result.Subscribe(output.Add, () => output.Add(-1));

    gate.OnCompleted();
    CollectionAssert.AreEqual(new[] { -1 }, output);
}
EN

回答 2

Stack Overflow用户

发布于 2018-06-04 20:58:56

这是可行的:

代码语言:javascript
复制
public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate)
{
    return
        source.Publish(ss => gate.Publish(gs =>
            gs
                .Select(g => g ? ss : ss.IgnoreElements())
                .Switch()
                .TakeUntil(Observable.Amb(
                    ss.Select(s => true).Materialize().LastAsync(),
                    gs.Materialize().LastAsync()))));
}

这两个测试都通过了。

票数 2
EN

Stack Overflow用户

发布于 2018-06-03 21:41:08

你和Observable.Create的关系是正确的。您应该从可观察对象的两个订阅中调用onError和onCompleted,以便正确完成或在需要时出错。另外,通过在Create委托中返回两个IDisposable,如果您打算在sourcegate完成之前处理When订阅,则可以确保两个订阅都被正确清理。

代码语言:javascript
复制
    public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate)
    {
        return Observable.Create<T>(
            o =>
            {
                var flag = false;
                var gs = gate.Subscribe(
                    value => flag = value,
                    e => o.OnError(e),
                    () => o.OnCompleted());

                var ss = source.Subscribe(
                    value =>
                    {
                        if (flag) o.OnNext(value);
                    },
                    e => o.OnError(e), 
                    () => o.OnCompleted());

                return new CompositeDisposable(gs, ss);
            });
    }

一个更短,但更难阅读的版本,只使用Rx运算符。对于冷可观察对象,它可能需要源的发布/引用计数。

代码语言:javascript
复制
    public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate)
    {
        return gate
            .Select(g => g ? source : source.IgnoreElements())
            .Switch()
            .TakeUntil(source.Materialize()
                             .Where(s => s.Kind == NotificationKind.OnCompleted));
    }
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50666864

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档