下面的可观察序列将每个元素添加到ReplaySubject中,以便我可以在以后访问任何元素,甚至等待ReplaySubject完成。在到达timespan之后,它将完成ReaplySubject。
ReplaySubject<string> rfidPlayer = new ReplaySubject<string>();
characteristic.WhenNotificationReceived()
.TakeUntil(Observable.Timer(TimeSpan.FromSeconds(1)))
.Subscribe(
onNext: result =>
{
string nextTag = BitConverter.ToString(result.Data);
nextTag = nextTag.Replace("-", "");
rfidPlayer.OnNext(nextTag);
},
onCompleted: () =>
{
rfidPlayer.OnCompleted();
});我希望序列能够运行,直到从上次"OnNext“调用到指定的时间,然后才完成。这将在各种蓝牙通信场景中非常有用,其中蓝牙设备将给我一系列数据,然后在没有任何形式的完成消息或事件的情况下停止。在这些场景中,我需要启发式地确定序列何时完成,然后自己完成。所以,如果上次蓝牙通知已经“太久”了,我想完成ReplaySubject。
我可以通过创建一个定时器,在接收到每个元素时重置它,然后在计时器到达“太长”时完成ReplaySubject,但是我听说创建一个对象并在可观察的订阅中操作它是不安全的。
关于如何在“太长”间隔后完成序列,有什么建议吗?
下面是一个不符合我所听到的线程安全的版本,但是应该按预期工作:
bool reading = true;
System.Timers.Timer timer = new System.Timers.Timer(1000);
timer.Elapsed += (sender, e) =>
{
reading = false;
};
ReplaySubject<string> rfidPlayer = new ReplaySubject<string>();
characteristic.WhenNotificationReceived()
.TakeWhile(x => reading)
.Subscribe(
onNext: result =>
{
string nextTag = BitConverter.ToString(result.Data);
nextTag = nextTag.Replace("-", "");
timer.Stop();
timer.Start();
rfidPlayer.OnNext(nextTag);
},
onCompleted: () =>
{
rfidPlayer.OnCompleted();
});根据Simonare的第一个答案,这似乎令人满意:
characteristic.WhenNotificationReceived()
.Timeout(TimeSpan.FromSeconds(1))
.Subscribe(
onNext: result =>
{
string nextTag = BitConverter.ToString(result.Data);
nextTag = nextTag.Replace("-", "");
rfidPlayer.OnNext(nextTag);
},
onError: error =>
{
rfidPlayer.OnCompleted();
});发布于 2019-02-01 20:19:06
您可以考虑使用超时操作符。唯一的缺点是它终止了错误信号。您可能需要处理错误。
Timeout操作符允许您在指定的时间范围内终止可观察到的onError终止,如果该可观察到的项未能发出任何项。
如果您使用下面的方法,您可以超越错误。
.Timeout(200, Promise.resolve(42));另一个变体允许您指示超时切换到您指定的可观察到的备份,而不是在触发超时条件时终止错误。
characteristic.WhenNotificationReceived()
.Timeout(TimeSpan.FromSeconds(1))
.Subscribe(
onNext: result =>
{
....
rfidPlayer.OnNext(....);
},
onError: error =>
{
rfidPlayer.OnCompleted();
});发布于 2020-11-21 12:25:38
下面是您可以使用的自定义操作符TakeUntilTimeout,它是内置Timeout运算符之上的一个瘦层。
/// <summary>
/// Applies a timeout policy for each element in the observable sequence.
/// If the next element isn't received within the specified timeout duration
/// starting from its predecessor, the sequence terminates.
/// </summary>
public static IObservable<T> TakeUntilTimeout<T>(
this IObservable<T> source,
TimeSpan timeout)
{
return source.Timeout(timeout, Observable.Empty<T>());
}https://stackoverflow.com/questions/54486553
复制相似问题