我有两个数据来源。
让我们想象一下:
是否有任何优雅的方法使用system.reactive从系统A检索数据,但是当它失败时(在提要中没有数据)或减速时,使用来自system的数据?我想要实现某种开关,当它比B快的时候使用A源,我不想混合资源,所以我可以同时使用SystemA或SystemB。
class PriceFeed {
public IObservable<Price> GetPricesFeed(IObservable<PriceFromA> pricesFromA, IObservable<PriceFromB> pricesFromB)
{
}
private Price Convert(PriceFromA price) { //convert }
private Price Convert(PriceFromB price) { //convert }
}发布于 2019-07-05 15:06:23
有趣的问题。要做的第一件事就是编写某种频率收集功能。可能是这样的:
public static IObservable<int> GetFrequency<T>(this IObservable<T> source, TimeSpan measuringFreq, TimeSpan lookback)
{
return source.GetFrequency(measuringFreq, lookback, Scheduler.Default);
}
public static IObservable<int> GetFrequency<T>(this IObservable<T> source, TimeSpan measuringFreq, TimeSpan lookback, IScheduler scheduler)
{
return source.Buffer(lookback, measuringFreq, scheduler)
.Select(l => l.Count);
}如果measuringFreq是1秒,lookback是5秒,这意味着每秒钟我们就会看到最后5秒内传递了多少条消息。快速而肮脏的例子:
var r = new System.Random();
var nums = Observable.Generate(
0,
i => i < 100,
i => i + 1,
i => i, _ => TimeSpan.FromSeconds(r.NextDouble() * 1)
);
var freq = nums.GetFrequency(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5));
freq.Dump(); //Linqpadnums是一个可观察的对象,平均每半秒钟就会生成一条消息(它随机选择0到1秒之间的持续时间)。freq每秒钟生成一个值,该值返回在最后5秒内生成的消息nums的数量(平均为10)。在我的机器上的最新运行中,我得到了这个:
11
11
12
10
12
11
9
9
10
9
8
...一旦我们有了获得频率的方法,你就需要编写一个函数来综合两个类似类型的观测值,根据频率进行切换。我写了这个:
public static IObservable<T> MaintainFrequencyImproper<T>(this IObservable<T> sourceA, IObservable<T> sourceB, TimeSpan measuringFreq, TimeSpan lookback, IScheduler scheduler, int aAdvantage = 0)
{
var aFreq = sourceA.GetFrequency(measuringFreq, lookback, scheduler);
var bFreq = sourceB.GetFrequency(measuringFreq, lookback, scheduler);
var toReturn = aFreq.Zip(bFreq, (a, b) => a + aAdvantage - b)
.Select(freqDifference => freqDifference < 0 ? sourceB : sourceA) //If advantage is 0, and a & b both popped out 5 messages in the last second, then A wins
.StartWith(sourceA)
.Switch();
return toReturn;
}首先,我们用GetFrequency得到这两个观测值的频率,然后把它们拉链在一起,然后比较它们。如果B比A更频繁,那么就使用B。如果它们相当频繁或A更频繁,则使用A。
aAdvantage变量允许您表达对A的更强的偏好,而不是B.0(默认的)意味着源A赢了一个平手,或者当它更频繁的时候,但否则B赢了。2意味着B必须在最近一段时间内产生比A多3条消息才能要求使用B。
通过适当的可观察性Publishing来避免多个订阅,这样的结果如下所示:
public static IObservable<T> MaintainFrequencyProper<T>(this IObservable<T> sourceA, IObservable<T> sourceB, TimeSpan measuringFreq, TimeSpan lookback,
IScheduler scheduler, int aAdvantage = 0)
{
return sourceA.Publish(_sourceA => sourceB.Publish(_sourceB =>
_sourceA.GetFrequency(measuringFreq, lookback, scheduler)
.Zip(_sourceB.GetFrequency(measuringFreq, lookback, scheduler), (a, b) => a + aAdvantage - b)
.Select(freqDifference => freqDifference < 0 ? _sourceB : _sourceA)
.StartWith(_sourceA)
.Switch()
))
}我希望这能帮到你。在如何将其融入您的代码方面,您并没有留下太多信息。如果你想要的话,请包括一个mcve。
https://stackoverflow.com/questions/56885979
复制相似问题