首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >rx.net合并复位流

rx.net合并复位流
EN

Stack Overflow用户
提问于 2020-01-29 14:00:57
回答 1查看 86关注 0票数 1

我们有一个正在运行的服务,它处理从system到system的消息。

代码语言:javascript
复制
aSystem.Messages.Subscribe(message => 
{
   try
   {
      ProcessMessage(message);
   }
   catch(Exception ex)
   {
      _logger.LogFatal(ex.Message);
   }
})

问题是,我们每秒钟至少接收一条消息,我们的LogFatal消息被配置为发送电子邮件。结果,邮箱在某一时刻爆炸了。

通过添加一个保存最后时间戳的自定义日志类,代码得到了“改进”。基于该时间戳,消息将记录或不记录。

这看起来很麻烦,我认为这是Rx.NET的完美场景。我们需要的是:

  • 1)如果字符串更改,记录
  • 2)如果一段时间过去了

我尝试了以下几点:

代码语言:javascript
复制
var logSubject = new Subject<string>();
var logMessagesChangedStream = logSubject.DistinctUntilChanged(); // log on every message change
var logMessagesSampleStream = logSubject.Sample(TimeSpan.FromSeconds(10)); // log at least every 10 seconds

var subscription = logMessagesChangedStream.Merge(logMessagesSampleStream).Subscribe(result =>
{
    _logger.LogFatal(result);
});

aSystem.Messages.Subscribe(message => 
{
   try
   {
      ProcessMessage(message);
   }
   catch(Exception ex)
   {
      logSubject.OnNext(ex.Message);
   }
})

它看起来很有效,但是这将记录消息两次,一次是针对DistinctUntilChanged,另一次是针对Sample。因此,如果其中一个发出值,我就应该重新设置流。他们独立完美地工作,一旦合并,他们应该互相倾听;-)

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-01-30 12:25:14

有一个模棱两可的运算符Amb,它竞争两个序列,看哪一个先赢。

代码语言:javascript
复制
Observable.Amb(logMessagesChangedStream, logMessagesSampleStream) 

获胜的流继续传播到最后--我们不想那样做。我们有兴趣重新开始下一个价值的竞赛。让我们这样做:

代码语言:javascript
复制
    Observable.Amb(logMessagesChangedStream, logMessagesSampleStream)
      .Take(1)
      .Repeat()

现在,最后一个问题是,每次我们重新启动比赛时,DistinctUntilChanged都会失去它的状态,它的行为就是立即推送它得到的第一个值。让我们把它变成一个可以观察到的热点。

代码语言:javascript
复制
logSubject.DistinctUntilChanged().Publish();

把这一切结合在一起:

代码语言:javascript
复制
var logSubject = new Subject<string>();
var logMessagesChangedStream = logSubject.DistinctUntilChanged().Publish(); // log on every message change
var logMessagesSampleStream = logSubject.Sample(TimeSpan.FromSeconds(5)); // log at least every 5 seconds    

var oneof =
        Observable
          .Amb(logMessagesChangedStream, logMessagesSampleStream)
          .Take(1)
          .Repeat(); 

   logMessagesChangedStream.Connect();
   oneof.Timestamp().Subscribe(c => Console.WriteLine(c));

把10秒改为5秒,因为

测试

代码语言:javascript
复制
    Action<string> onNext = logSubject.OnNext;

    onNext("a");
    onNext("b");
    Delay(1000, () => { onNext("c"); onNext("c"); onNext("c"); onNext("d"); });
    Delay(3000, () => { onNext("d"); onNext("e"); });
    Delay(6000, () => { onNext("e"); });
    Delay(10000, () => { onNext("e"); });

输出

代码语言:javascript
复制
a@1/30/2020 12:17:52 PM +00:00
b@1/30/2020 12:17:52 PM +00:00
c@1/30/2020 12:17:53 PM +00:00
d@1/30/2020 12:17:53 PM +00:00
e@1/30/2020 12:17:55 PM +00:00
e@1/30/2020 12:18:00 PM +00:00
e@1/30/2020 12:18:05 PM +00:00
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59968703

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档