我需要一个操作符来允许一个布尔流充当另一个流的门(当门流为真时,让值传递,当它为false时,将其丢弃)。我通常会为此使用开关,但是如果源流是冷的,它将继续重新创建它,这是我不希望的。
我还想清理自己,以便结果完成,如果任何一个源或门完成。
public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate)
{
var s = source.Publish().RefCount();
var g = gate.Publish().RefCount();
var sourceCompleted = s.TakeLast(1).DefaultIfEmpty().Select(_ => Unit.Default);
var gateCompleted = g.TakeLast(1).DefaultIfEmpty().Select(_ => Unit.Default);
var anyCompleted = Observable.Amb(sourceCompleted, gateCompleted);
var flag = false;
g.TakeUntil(anyCompleted).Subscribe(value => flag = value);
return s.Where(_ => flag).TakeUntil(anyCompleted);
}除了整个细节之外,我不喜欢订阅门,即使结果从来没有被订阅过(在这种情况下,这个操作符应该是不操作的)。有办法摆脱订阅吗?
我也尝试过这个实现,但是当涉及到清理它本身时,情况就更糟了:
return Observable.Create<T>(
o =>
{
var flag = false;
gate.Subscribe(value => flag = value);
return source.Subscribe(
value =>
{
if (flag) o.OnNext(value);
});
});下面是我用来检查实现的测试:
[TestMethod]
public void TestMethod1()
{
var output = new List<int>();
var source = new Subject<int>();
var gate = new Subject<bool>();
var result = source.When(gate);
result.Subscribe(output.Add, () => output.Add(-1));
// the gate starts with false, so the source events are ignored
source.OnNext(1);
source.OnNext(2);
source.OnNext(3);
CollectionAssert.AreEqual(new int[0], output);
// setting the gate to true will let the source events pass
gate.OnNext(true);
source.OnNext(4);
CollectionAssert.AreEqual(new[] { 4 }, output);
source.OnNext(5);
CollectionAssert.AreEqual(new[] { 4, 5 }, output);
// setting the gate to false stops source events from propagating again
gate.OnNext(false);
source.OnNext(6);
source.OnNext(7);
CollectionAssert.AreEqual(new[] { 4, 5 }, output);
// completing the source also completes the result
source.OnCompleted();
CollectionAssert.AreEqual(new[] { 4, 5, -1 }, output);
}
[TestMethod]
public void TestMethod2()
{
// completing the gate also completes the result
var output = new List<int>();
var source = new Subject<int>();
var gate = new Subject<bool>();
var result = source.When(gate);
result.Subscribe(output.Add, () => output.Add(-1));
gate.OnCompleted();
CollectionAssert.AreEqual(new[] { -1 }, output);
}发布于 2018-06-04 13:21:44
根据StackOverflow 1的响应,这是我想出来的;它通过了两个测试,还有第三个测试,其中我验证了寒冷的可观测数据只处理了一次:
public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate)
{
return source.Publish(
ss =>
{
var gg = gate.Publish().RefCount();
var bothCompleted = Observable.Amb(ss.WhenCompleted(), gg.WhenCompleted());
return gate.Select(g => g ? ss : ss.IgnoreElements()).Switch().TakeUntil(bothCompleted);
});
}
private static IObservable<Unit> WhenCompleted<T>(this IObservable<T> source) =>
source.Select(_ => Unit.Default).IgnoreElements().Concat(Observable.Return(Unit.Default));https://codereview.stackexchange.com/questions/195719
复制相似问题