首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >RX Subscribe - subscribe()的意外行为&等待任务完成:~|

RX Subscribe - subscribe()的意外行为&等待任务完成:~|
EN

Stack Overflow用户
提问于 2016-06-06 00:31:48
回答 2查看 113关注 0票数 1

我有以下代码块,当运行时,我希望使用阻塞集合,并在Subscribe()操作中执行一些异步活动,这些活动在RX移动到观察者序列中的下一个条目之前等待。

实际发生的情况是,所有条目都是并行处理的(或看起来是)。

目前的代码如下所示!

代码语言:javascript
复制
    uploadQueues[workspaceId] //BlockingCollection<FileEvent>>
    .GetConsumingEnumerable()
    .ToObservable()
    .SubscribeOn(new TaskPoolScheduler(new TaskFactory()))
    .Subscribe(
      async fileEvent =>
       {
          //process file event
          Debug.WriteLine($"Upload queue processor 1: {fileEvent.Event} | {fileEvent.SourcePath} => {fileEvent.DestPath}");
          await Task.Delay(TimeSpan.FromSeconds(1));

       })

任何指向正确方向的指针都将不胜感激。我还想知道,如果不使用RX,我只是简单地产生一个长时间运行的TPL任务,该任务从阻塞集合中消耗!

有什么想法?

EN

回答 2

Stack Overflow用户

发布于 2016-06-06 12:48:25

Rx查询的主要问题是async/await部分,因为您没有处理或处理SynchronizationContext.Current

但它们是整个代码的其他问题。

SubscribeOn开始,TaskPoolScheduleIDisposable类,您应该正确地实现和处理它。

另外,当你使用ToObservable()方法时,它不会使用任何IScheduler - check for details here。您可以指定一个EventLoopScheduler来保证只有一个线程/资源将用于处理(尽管这不是必需的,因为GetConsumingEnumerable已经锁定了一个线程以供使用)。

如果你只是想模拟一个延迟,最好的方法是:

代码语言:javascript
复制
        enumerableItems.ToObservable()
            .Select(a => Observable.Return(a).DelaySubscription(TimeSpan.FromSeconds(1))) 
            .Concat() // Keep push order
            .Subscribe(
                fileEvent =>
                {
                    Debug.WriteLine(fileEvent);
                });

DelaySubscriptionDelay的效率要高一点请注意,延迟是不同步的,所以很可能会在另一个Thread.ThreadId中结束,但这并不重要,因为顺序和顺序将保持不变。

现在,如果你想对Rx使用async/await并保持单一威胁,这是另一个问题……

票数 2
EN

Stack Overflow用户

发布于 2016-06-07 09:56:44

对@J.Lennon的回答进行了投票

欢迎使用Rx。正如J.Lennon上面提到的,对上面的代码有一些改进建议。

与任何同步或异步代码一样,我们需要考虑我们的资源,以及我们如何确保在事后进行清理。我们还需要考虑如何处理异常。除了这些问题之外,当使用异步(因此并发性)时,我们还有取消的额外问题。我们应该提供取消操作的能力,我们还应该确保取消操作做了消费者所期望的事情。

虽然我认为Rx不是读取队列的最佳工具(请参阅http://introtorx.com/Content/v1.0.10621.0/01_WhyRx.html#Wont),但如果您真的想强制Rx进入画面,那么您可能还需要确保满足上述问题。

下面的代码使用

  • CancellationTokens允许取消(https://msdn.microsoft.com/en-us/library/dd395014(v=vs.110).aspx
  • EventLoopSchedulerGetConsumingEnumerable(),以确保单个线程专用于处理队列。这也使其序列化。(http://introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#EventLoopScheduler)
  • moving你在你的订阅者中的异步工作到查询管道中,它从上面belongs.
  • J.Lennon's Concat提示,以确保你正在做的异步工作也被序列化。(http://introtorx.com/Content/v1.0.10621.0/12_CombiningSequences.html#Concat)
  • All错误处理现在将是最终订阅者(订阅此方法的代码)唯一关心的问题。在这里,您将使用OnError处理程序。然而,这让我回到了为什么我不喜欢Rx处理队列的原因。如果失败了,你应该怎么做?尝试重新阅读邮件并陷入无限循环,或者将邮件扔到地板上假装什么都没有发生?(http://introtorx.com/Content/v1.0.10621.0/03_LifetimeManagement.html#Subscribe)

示例代码

代码语言:javascript
复制
IDictionary<string, BlockingCollection<object>> uploadQueues = new Dictionary<string, BlockingCollection<Object>>();
public IObservable<object> ListenToQueueEvents(string workspaceId)
{
    //Not sure what the return value is here, using `Object` as a place holder.
    //  Note that we are using an overload that takes a `CancellationToken`
    return Observable.Create<object>((obs, ct) =>
    {
        var els = new EventLoopScheduler(ts => new Thread(ts)
        {
            IsBackground = true,//? or false? Should it stop the process from terminating if it still running?
            Name = $"{workspaceId}Processor"
        });
        var subscription = uploadQueues[workspaceId] //BlockingCollection<FileEvent>>
            .GetConsumingEnumerable(ct) //Allow cancellation while wating for next item
            .ToObservable(els)  //Serialise onto a single thread.
            .Select(evt=>TheAsyncThingIWasDoingInTheSubscribe(evt).ToObservable())
            .Concat()
            .Subscribe(obs);
        //You could try to dispose of the els (EventLoopScheduler), But I have had issues doing so in the past. 
        //  Leaving as Background should allow it to die (but non deterministically) :-(
        return Task.FromResult(subscription);
    });
}
private static Task<object> TheAsyncThingIWasDoingInTheSubscribe(object evt)
{
    //The return of the async thing you were doing in the subscribe
    return Task.FromResult(new Object());
}
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/37644229

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档