首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >MultiCast和订阅Subject的反应式框架问题

MultiCast和订阅Subject的反应式框架问题
EN

Stack Overflow用户
提问于 2013-04-03 08:27:29
回答 1查看 618关注 0票数 3

我刚刚开始学习如何使用Reactive Framework,并且正在为能够多播发布到多个订阅者而苦苦挣扎。

我让一切都运行得很好,如下所示:

代码语言:javascript
复制
m_MessagePublisher = m_ServerClient.MessageQueue
      .GetConsumingEnumerable()
      .ToObservable(TaskPoolScheduler.Default);

var genericServerMessageSubscriber = m_MessagePublisher
      .Where(message => message is GenericServerMessage)
      .Subscribe(message =>
      {
          // do something here
      }

但后来我意识到这不支持多播,当我试图附加另一个应该被相同消息击中的订阅者时,它不会触发。我一直在研究.MultiCast扩展,试图弄清楚Subject是如何在其中发挥作用的,但还没能让它发挥作用:

代码语言:javascript
复制
var subject = new Subject<BesiegedMessage>();

var messagePublisher = m_ServerClient.MessageQueue
      .GetConsumingEnumerable()
      .ToObservable(TaskPoolScheduler.Default)
      .Multicast(subject);

// All generic server messages are handled here
var genericServerMessageSubscriber = subject
      .Where(message => message is GenericServerMessage)
      .Subscribe(message =>
      {
            // do something here
      }

但现在没有一个用户受到影响,包括之前工作正常的单个用户。为了能够正确地多播到多个订户,我在这里缺少了什么?

更新:使用subject(Subject)而不是Multicast(subject)似乎可以实现多播,这让我对.MultiCast()的用途感到非常困惑

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2013-04-03 09:35:31

编辑:

哈哈-我读得太快了-你问的是simpler...that说的方式,我认为下面的内容很重要,所以我离开了it...So,你的问题-尝试添加下面这行:

代码语言:javascript
复制
var messagePublisher = m_ServerClient.MessageQueue
  .GetConsumingEnumerable()
  .ToObservable(TaskPoolScheduler.Default)
  .Multicast(subject)
  // Here: connectable observables are a PITA...
  .RefCount();

结束编辑:

Hmm...how来描述Multicast...I猜想让我们通过例子来实现:

假设你得到了这样的东西--你认为它会产生什么?

代码语言:javascript
复制
int delay = 100;
var source = Observable.Interval(TimeSpan.FromMilliseconds(delay));
var publishingFrontend = new Subject<string>();

// Here's "raw"
var rawStream = source;
using(rawStream.Subscribe(x => Console.WriteLine("{0}", x)))
{
    Thread.Sleep(delay * 3);
    using(rawStream.Subscribe(x => Console.WriteLine("Inner: {0}", x)))
    {
        Thread.Sleep(delay * 3);
    }
    Thread.Sleep(delay * 5);
}

由于您订阅的是原始数据流,因此新的订阅者基本上是从头开始的:

(如果您重新运行,这将不是100%匹配,因为我采用了Thread.Sleep的wussy方法,但应该很接近)

代码语言:javascript
复制
0
1
2
Inner: 0
3
Inner: 1
4
5
6
7
8
9

如果我们想要“中流打结”,我们使用Publish().RefCount()模式:

代码语言:javascript
复制
var singleSource = source.Publish().RefCount();
using(singleSource.Subscribe(x => Console.WriteLine("{0}", x)))
{
    Thread.Sleep(delay * 3);
    using(singleSource.Subscribe(x => Console.WriteLine("Inner: {0}", x)))
    {
        Thread.Sleep(delay * 3);
    }
    Thread.Sleep(delay * 5);
}

这会产生类似如下的结果:

代码语言:javascript
复制
0
1
2
Inner: 2
3
Inner: 3
4
Inner: 4
5
6
7
8
9

假设我们没有Publish()运算符--我们如何模拟它呢?

代码语言:javascript
复制
Console.WriteLine("Simulated Publish:");
// use a subject to proxy values...
var innerSubject = new Subject<long>();
// wire up the source to "write to" the subject
var innerSub = source.Subscribe(innerSubject);
var simulatedSingleSource = Observable.Create<long>(obs =>
{
    // return subscriptions to the "proxied" subject
    var publishPoint = innerSubject.Subscribe(obs);        
    return publishPoint;
});

运行这个程序,我们得到:

代码语言:javascript
复制
Simulated Publish:
0
1
2
Inner: 2
3
Inner: 3
4
Inner: 4
5
6
7
8
9

哇哦!

但还有另一种方法...

代码语言:javascript
复制
Console.WriteLine("MulticastPublish:");
var multicastPublish = source.Multicast(new Subject<long>()).RefCount();    
using(multicastPublish.Subscribe(x => Console.WriteLine("{0}", x)))
{
    Thread.Sleep(delay * 3);
    using(multicastPublish.Subscribe(x => Console.WriteLine("Inner: {0}", x)))
    {
        Thread.Sleep(delay * 3);
    }
    Thread.Sleep(delay * 5);
}

输出:

代码语言:javascript
复制
MulticastPublish:
0
1
2
Inner: 2
3
Inner: 3
4
Inner: 4
5
6
7
8
9

编辑:

事实上,所有的ConnectableObservable生成扩展都依赖于Multicast/Subject配对:

代码语言:javascript
复制
Publish() => Multicast(new Subject<T>)
Replay() => Multicast(new ReplaySubject<T>)
PublishLast() => Multicast(new AsyncSubject<T>)
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/15776877

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档