给定两个热点可观测值t1和t2,我将如何从t2获得在t1中每个事件发生之前x秒和y秒之后发生的所有事件?
给予:
t1
t2 -1-2-3-4-5-6
如果t1间隔2秒,t2间隔1秒,并且我们正在寻找每一个t1事件的1秒的t2事件,则结果如下。
结果:
{ A,1,2,3 }
{ B,3,4,5 }
{ C,5,6 }
下面是实际的例子,我们需要解决上述问题。我们有一组电子邮件和另一组短信。我们需要发出另一个结果的流,其中有电子邮件和短信发生在之前或之后1分钟的电子邮件发送时间。
发布于 2018-01-11 11:54:30
这里的问题(正如Shlomo所提到的)是,我们需要在t2事件发生之前打开t1窗口。不幸的是,这是不可能的,因为一旦我们到达了t1中的事件,我们已经过了在t2中打开窗口的时间。
我们可以做的是使用Delay()及时地将t2向前移动。如果我们用x (以前的时间)来抵消它,我们可以将问题重组为“获取t2中在t1窗口打开和t1 + x + y关闭时发生的事件。我们可以使用GroupJoin来解决这个问题。”
var scheduler = new HistoricalScheduler();
var t1 = Observable.Interval(TimeSpan.FromMilliseconds(200), scheduler)
.Select(l => (char)('A' + l));
var t2 = Observable.Interval(TimeSpan.FromMilliseconds(100), scheduler);
var x = TimeSpan.FromMilliseconds(100); //before time
var y = TimeSpan.FromMilliseconds(100); //after time
var delayedT2 = t2.Delay(x, scheduler);
var g = t1.GroupJoin(delayedT2 ,
_ => Observable.Timer(x + y, scheduler),
_ => Observable.Empty<Unit>(scheduler),
(a, b) => new { a, b}
);
scheduler.Start();这样做的结果是:
{ A, [1,2] }
{ B, [3,4] }
{ C, [5,6] }这个结果仍然与你所期望的不太一样。这是因为在您的示例中,t2事件发生在完全相同的即时t1事件中。在这种情况下,首先处理t1 + y事件,然后关闭窗口,然后才能包含t2事件。这意味着我们实际上得到了(t1-01:00) <= t1 < (t1 + 01:00)。A的窗口是01:0000 -02.9999.这就是不包括在03:00发生的3的原因。
只需在我们的y时间中添加一个滴答,就可以将其修正为包含所有内容。
var y = TimeSpan.FromMilliseconds(100).Add(TimeSpan.FromTicks(1)); 发布于 2018-01-03 15:38:15
代码转储答案(使用100毫秒代替1秒):
var t1 = Observable.Interval(TimeSpan.FromMilliseconds(200))
.Select(l => (char)('A' + l))
.Delay(TimeSpan.FromMilliseconds(200));
var t2 = Observable.Interval(TimeSpan.FromMilliseconds(100))
.Delay(TimeSpan.FromMilliseconds(100));
var x = TimeSpan.FromMilliseconds(100); //before time
var y = TimeSpan.FromMilliseconds(100); //after time
var g = t1.Timestamp().Join(t2.Timestamp(),
c => Observable.Timer(y),
i => Observable.Timer(x + y),
(c, i) => new {GroupItem = c, RightItem = i}
)
.Where(a =>
(a.GroupItem.Timestamp > a.RightItem.Timestamp && a.GroupItem.Timestamp - a.RightItem.Timestamp <= x) //group-item came first
|| (a.GroupItem.Timestamp <= a.RightItem.Timestamp && a.RightItem.Timestamp - a.GroupItem.Timestamp <= y) // right-item came first, or exact timestamp match
)
.Select(a => new { GroupItem = a.GroupItem.Value, RightItem = a.RightItem.Value })
.GroupBy(a => a.GroupItem, a => a.RightItem);解释: Join都是关于"windows“的。因此,当定义一个联接时,您必须考虑从左侧可观察到和右侧可观察到的每个项目打开的时间窗口。我们在这里的窗口很难弄清楚:我们必须在它发生之前为左边的可观测X时间打开一个窗口,然后在它发生后的Y时间关闭它。
而不是做不可能的事情,所以我们只在左项出现后的Y时间内打开它,并让右项窗口由X+Y时间定义。然而,这将留给我们不应该包括的项目。所以我们在时间戳上使用一个Where来过滤掉它们。
最后,我们选择匿名类型和时间戳,并将其组合在一起。
我不认为GroupJoin是走在这里的路:你最终会拆散这个小组,并像我所做的那样重组它。
https://stackoverflow.com/questions/48075233
复制相似问题