首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >“长”间隔后的完全可观测序列

“长”间隔后的完全可观测序列
EN

Stack Overflow用户
提问于 2019-02-01 20:10:22
回答 2查看 282关注 0票数 2

下面的可观察序列将每个元素添加到ReplaySubject中,以便我可以在以后访问任何元素,甚至等待ReplaySubject完成。在到达timespan之后,它将完成ReaplySubject。

代码语言:javascript
复制
ReplaySubject<string> rfidPlayer = new ReplaySubject<string>();
characteristic.WhenNotificationReceived()
    .TakeUntil(Observable.Timer(TimeSpan.FromSeconds(1)))
    .Subscribe(
        onNext: result =>
        {
            string nextTag = BitConverter.ToString(result.Data);
            nextTag = nextTag.Replace("-", "");

            rfidPlayer.OnNext(nextTag);
        },
        onCompleted: () =>
        {
            rfidPlayer.OnCompleted();
        });

我希望序列能够运行,直到从上次"OnNext“调用到指定的时间,然后才完成。这将在各种蓝牙通信场景中非常有用,其中蓝牙设备将给我一系列数据,然后在没有任何形式的完成消息或事件的情况下停止。在这些场景中,我需要启发式地确定序列何时完成,然后自己完成。所以,如果上次蓝牙通知已经“太久”了,我想完成ReplaySubject。

我可以通过创建一个定时器,在接收到每个元素时重置它,然后在计时器到达“太长”时完成ReplaySubject,但是我听说创建一个对象并在可观察的订阅中操作它是不安全的。

关于如何在“太长”间隔后完成序列,有什么建议吗?

下面是一个不符合我所听到的线程安全的版本,但是应该按预期工作:

代码语言:javascript
复制
bool reading = true;
System.Timers.Timer timer = new System.Timers.Timer(1000);
timer.Elapsed += (sender, e) =>
{
    reading = false;
};

ReplaySubject<string> rfidPlayer = new ReplaySubject<string>();
characteristic.WhenNotificationReceived()
    .TakeWhile(x => reading)
    .Subscribe(
        onNext: result =>
        {
            string nextTag = BitConverter.ToString(result.Data);
            nextTag = nextTag.Replace("-", "");

            timer.Stop();
            timer.Start();

            rfidPlayer.OnNext(nextTag);
        },
        onCompleted: () =>
        {
            rfidPlayer.OnCompleted();
        });

根据Simonare的第一个答案,这似乎令人满意:

代码语言:javascript
复制
                characteristic.WhenNotificationReceived()
                .Timeout(TimeSpan.FromSeconds(1))
                .Subscribe(
                    onNext: result =>
                    {
                        string nextTag = BitConverter.ToString(result.Data);
                        nextTag = nextTag.Replace("-", "");

                        rfidPlayer.OnNext(nextTag);
                    },
                    onError: error =>
                    {
                        rfidPlayer.OnCompleted();
                    });
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-02-01 20:19:06

您可以考虑使用超时操作符。唯一的缺点是它终止了错误信号。您可能需要处理错误。

Timeout操作符允许您在指定的时间范围内终止可观察到的onError终止,如果该可观察到的项未能发出任何项。

如果您使用下面的方法,您可以超越错误。

代码语言:javascript
复制
 .Timeout(200, Promise.resolve(42));

另一个变体允许您指示超时切换到您指定的可观察到的备份,而不是在触发超时条件时终止错误。

代码语言:javascript
复制
characteristic.WhenNotificationReceived()
    .Timeout(TimeSpan.FromSeconds(1))
    .Subscribe(
        onNext: result =>
        {
            ....
            rfidPlayer.OnNext(....);
        },
        onError: error =>
        {
            rfidPlayer.OnCompleted();
        });
票数 1
EN

Stack Overflow用户

发布于 2020-11-21 12:25:38

下面是您可以使用的自定义操作符TakeUntilTimeout,它是内置Timeout运算符之上的一个瘦层。

代码语言:javascript
复制
/// <summary>
/// Applies a timeout policy for each element in the observable sequence.
/// If the next element isn't received within the specified timeout duration
/// starting from its predecessor, the sequence terminates.
/// </summary>
public static IObservable<T> TakeUntilTimeout<T>(
    this IObservable<T> source,
    TimeSpan timeout)
{
    return source.Timeout(timeout, Observable.Empty<T>());
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54486553

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档