我有一个非常简单的IObservable<int>,它每隔500ms起到脉冲发生器的作用:
var pulses = Observable.GenerateWithTime(0, i => true, i => i + 1, i => i,
i => TimeSpan.FromMilliseconds(500))我有一个CancellationTokenSource (用于取消正在同时进行的其他工作)。
如何使用取消令牌源取消我的可观察序列?
发布于 2011-07-21 21:27:25
如果您正在使用GenerateWithTime (现在替换为在timespan函数重载中传递Generate ),则可以替换第二个参数来评估取消令牌的状态,如下所示:
var pulses = Observable.Generate(0,
i => !ts.IsCancellationRequested,
i => i + 1,
i => i,
i => TimeSpan.FromMilliseconds(500));或者,如果可以将导致设置取消令牌的事件转换为可观察对象本身,则可以使用类似以下内容:
pulses.TakeUntil(CancelRequested);我也在http://www.thinqlinq.com/Post.aspx/Title/Cancelling-a-Reactive-Extensions-Observable上发布了更详细的解释。
发布于 2014-09-03 21:33:59
这是一个旧的线程,但仅供将来参考,这里有一个更简单的方法来完成它。
如果你有一个CancellationToken,你可能已经在处理任务了。因此,只需将其转换为Task,并让框架进行绑定:
using System.Reactive.Threading.Tasks;
...
var task = myObservable.ToTask(cancellationToken);这将创建一个内部订阅者,该订阅者将在任务取消时被释放。这在大多数情况下都会起作用,因为大多数可观察对象只有在有订阅者的情况下才会产生值。
现在,如果你有一个实际的可观察对象由于某种原因需要被处理(可能是一个热的可观察对象,如果一个父任务被取消了,那么它就不再重要了),这可以通过继续来实现:
disposableObservable.ToTask(cancellationToken).ContinueWith(t => {
if (t.Status == TaskStatus.Canceled)
disposableObservable.Dispose();
});发布于 2011-07-20 17:55:13
您可以使用以下代码片段将您的IObservable订阅与CancellationTokenSource连接
var pulses = Observable.GenerateWithTime(0,
i => true, i => i + 1, i => i,
i => TimeSpan.FromMilliseconds(500));
// Get your CancellationTokenSource
CancellationTokenSource ts = ...
// Subscribe
ts.Token.Register(pulses.Subscribe(...).Dispose);https://stackoverflow.com/questions/6759833
复制相似问题