我目前正在阅读http://www.introtorx.com/,我对将Subject<T>从我的反应式代码中剥离出来非常感兴趣。我开始理解如何封装序列生成,以便更好地对给定序列进行推理。我读了几个SO问题,最后读了一些关于日程安排的文章。特别有趣的是使用Schedule(this IScheduler scheduler, Action<TState,Action<TState>>)重载- like this one的递归调度。
这本书开始在一些领域显示出它的时代,我看到的最大的一点是,它从未将其技术与可能使用Task和async/await语言特性实现的替代技术进行比较。我总是觉得我可以通过忽略书中的建议和使用异步玩具来编写更少的代码,但我的内心深处一直在抱怨我很懒,没有正确地学习模式。
这就是我的问题所在。如果我想以一定的时间间隔调度一个序列,支持取消,并在后台线程上执行工作,我可能会这样做:
static void Main(string[] args)
{
var sequence = Observable.Create<object>(o =>
{
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
DoWerk(o, cancellationTokenSource);
return cancellationTokenSource.Cancel;
});
sequence.Subscribe(p => Console.Write(p));
Console.ReadLine();
}
private static async void DoWerk(IObserver<object> o, CancellationTokenSource cancellationTokenSource)
{
string message = "hello world!";
for (int i = 0; i < message.Length; i++)
{
await Task.Delay(250, cancellationTokenSource.Token);
o.OnNext(message[i]);
if (cancellationTokenSource.IsCancellationRequested)
{
break;
}
}
o.OnCompleted();
}请注意,在使用Task.Run()显式借用线程池线程的情况下,使用async void创建并发性。然而,await Task.Delay()将会这样做,但它不会租用线程很长时间。
这里的限制和陷阱是什么?您可能更喜欢使用递归调度的原因是什么?
发布于 2017-11-14 08:46:09
我个人不会使用await Task.Delay(250, cancellationTokenSource.Token);作为降低循环速度的一种方式。它比Thread.Sleep(250)更好,但对我来说它仍然是一种代码味道。
我认为您应该使用内置运算符,而不是像这样的自定义解决方案。
您需要的运算符是最强大的运算符之一,但经常被忽略。试试Observable.Generate。他是如何:
static void Main(string[] args)
{
IObservable<char> sequence = Observable.Create<char>(o =>
{
string message = "hello world!";
return
Observable
.Generate(
0,
n => n < message.Length,
n => n + 1,
n => message[n],
n => TimeSpan.FromMilliseconds(250.0))
.Subscribe(o);
});
using (sequence.Subscribe(p => Console.Write(p)))
{
Console.ReadLine();
}
}这是自取消的(当您对订阅调用.Dispose()时),并且每隔250.0毫秒生成值一次。
我继续使用Observable.Create运算符来确保message变量被封装在可观察对象中-否则,当可观察对象正在使用它时,可能会有人更改message的值,从而破坏它。
作为另一种选择,这可能不像内存那样有效,但它是自封装的,请尝试如下所示:
IObservable<char> sequence =
Observable
.Generate(
"hello world!",
n => !String.IsNullOrEmpty(n),
n => n.Substring(1),
n => n[0],
n => TimeSpan.FromMilliseconds(250.0));最后,在你的问题中没有任何关于调度的“递归”。你这么说是什么意思?
我终于明白你在看什么了。我在问题中漏掉了。
下面是一个使用递归调度的示例:
IObservable<char> sequence = Observable.Create<char>(o =>
{
string message = "hello world!";
return Scheduler.Default.Schedule<string>(message, TimeSpan.FromMilliseconds(250.0), (state, schedule) =>
{
if (!String.IsNullOrEmpty(state))
{
o.OnNext(state[0]);
schedule(state.Substring(1), TimeSpan.FromMilliseconds(250.0));
}
else
{
o.OnCompleted();
}
});
});https://stackoverflow.com/questions/47253085
复制相似问题