首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >从定期异步请求创建可观察对象

从定期异步请求创建可观察对象
EN

Stack Overflow用户
提问于 2020-11-03 16:44:26
回答 3查看 504关注 0票数 6

我想要一种将异步方法转换为observable的通用方法。在我的例子中,我处理的方法使用

从API获取数据。

假设我们有这样的方法

它需要成为一个单一的

其中,这些值是由以下各项组合生成的:

重复定期调用

(例如每x秒)

手动触发的调用

在任何给定的时间(例如,当用户点击刷新时)。

因为有两种方式可以触发

并发性可能是一个问题。为了避免要求

是线程安全的,我希望限制并发性,以便只有一个线程同时执行该方法。因此,我需要使用某种策略来处理重叠请求。我画了一个(类似)大理石图,试图描述问题和想要的结果。

我的直觉告诉我有一个简单的方法可以做到这一点,所以请给我一些见解:)

到目前为止,这是我得到的解决方案。不幸的是,它并没有解决并发问题。

代码语言:javascript
复制
public class ObservableCreationWrapper
    {
        private Subject _manualCallsSubject = new Subject();
        private Func> _methodToCall;
        private IObservable _manualCalls;

        public IObservable Stream { get; private set; }

        public ObservableCreationWrapper(Func> methodToCall, TimeSpan period)
        {
            _methodToCall = methodToCall;
            _manualCalls = _manualCallsSubject.AsObservable()
                .Select(x => Observable.FromAsync(x => methodToCall()))
                .Merge(1);

            Stream = Observable.FromAsync(() => _methodToCall())
                .DelayRepeat(period)
                .Merge(_manualCalls);
        }

        public void TriggerAdditionalCall()
        {
            _manualCallsSubject.OnNext(Unit.Default);
        }
    }

延时重复的扩展方法:

代码语言:javascript
复制
static class Extensions
{
    public static IObservable DelayRepeat(this IObservable source, TimeSpan delay) => source
        .Concat(
            Observable.Create(async observer =>
            {
                await Task.Delay(delay);
                observer.OnCompleted();
            }))
        .Repeat();
}

包含生成可观察对象的方法的服务示例

代码语言:javascript
复制
class SomeService
{
    private int _ticks = 0;

    public async Task GetSomeValueAsync()
    {
        //Just a hack to dermine if request was triggered manuall or by timer
        var initiatationWay = (new StackTrace()).GetFrame(4).GetMethod().ToString().Contains("System.Threading.CancellationToken") ? "manually" : "by timer";

        //Here we have a data race! We would like to limit access to this method 
        var valueToReturn = $"{_ticks} ({initiatationWay})";

        await Task.Delay(500);
        _ticks += 1; 
        return valueToReturn;
    }
}

像这样使用(数据竞争将会发生):

代码语言:javascript
复制
static async Task Main(string[] args)
{
    //Running this program will yield non deterministic results due to data-race in GetSomeValueAsync
    var someService = new SomeService();
    var stopwatch = Stopwatch.StartNew();
    var observableWrapper = new ObservableCreationWrapper(someService.GetSomeValueAsync, TimeSpan.FromMilliseconds(2000));
    observableWrapper.Stream
        .Take(6)
        .Subscribe(x => 
            {
                Console.WriteLine($"{stopwatch.ElapsedMilliseconds} | Request: {x} fininshed");
            });

    await Task.Delay(4000);
    observableWrapper.TriggerAdditionalCall();
    observableWrapper.TriggerAdditionalCall();
    Console.ReadLine();
}
EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2020-11-12 19:31:49

以下是我对这个问题的看法:

更新:

我借用了英格玛斯的想法,大大简化了我所建议的解决方案。

答案

..。The The The

方法会自动处理取消这一杂乱的业务,并且可以通过使用一个简单的方法来强制执行不重叠的要求。

..。

代码语言:javascript
复制
/// 
/// Creates an observable sequence containing the results of an asynchronous
/// function that is invoked periodically and manually. Overlapping invocations
/// are prevented. Timer ticks that would cause overlapping are ignored.
/// Manual invocations cancel previous invocations, and restart the timer.
/// 
public static IObservable PeriodicAndManual(
    Func> functionAsync,
    TimeSpan period,
    out Action manualInvocation)
{
    // Arguments validation omitted
    var manualSubject = new Subject();
    manualInvocation = () => manualSubject.OnNext(true);
    return Observable.Defer(() =>
    {
        var semaphore = new SemaphoreSlim(1, 1); // Ensure no overlapping
        return Observable
            .Interval(period)
            .Select(_ => false) // Not manual
            .Merge(manualSubject)
            .TakeUntil(isManual => isManual) // Stop on first manual
            .Repeat() // ... and restart the timer
            .Prepend(false) // Skip the initial interval delay
            .Select(isManual =>
            {
                if (isManual)
                {
                    // Triggered manually
                    return Observable.StartAsync(async ct =>
                    {
                        await semaphore.WaitAsync(ct);
                        try { return await functionAsync(isManual, ct); }
                        finally { semaphore.Release(); }
                    });
                }
                else if (semaphore.Wait(0))
                {
                    // Triggered by the timer and semaphore acquired synchronously
                    return Observable
                        .StartAsync(ct => functionAsync(isManual, ct))
                        .Finally(() => semaphore.Release());
                }
                return null; // Otherwise ignore the signal
            })
            .Where(op => op != null)
            .Switch(); // Pending operations are unsubscribed and canceled
    });
}

The The The

参数是触发手动调用的机制。

使用示例:

代码语言:javascript
复制
int ticks = 0;
var subscription = PeriodicAndManual(async (isManual, token) =>
{
    var id = $"{++ticks} " + (isManual ? "manual" : "periodic");
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Begin {id}");
    await Task.Delay(500, token);
    return id;
}, TimeSpan.FromMilliseconds(1000), out var manualInvocation)
.Do(x => Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Received #{x}"))
.Subscribe();

await Task.Delay(3200);
manualInvocation();
await Task.Delay(200);
manualInvocation();
await Task.Delay(3200);

subscription.Dispose();

输出:

代码语言:javascript
复制
19:52:43.684 Begin 1 periodic
19:52:44.208 Received #1 periodic
19:52:44.731 Begin 2 periodic
19:52:45.235 Received #2 periodic
19:52:45.729 Begin 3 periodic
19:52:46.232 Received #3 periodic
19:52:46.720 Begin 4 periodic
19:52:46.993 Begin 5 manual
19:52:47.220 Begin 6 manual
19:52:47.723 Received #6 manual
19:52:48.223 Begin 7 periodic
19:52:48.728 Received #7 periodic
19:52:49.227 Begin 8 periodic
19:52:49.730 Received #8 periodic
19:52:50.226 Begin 9 periodic

使用

运算符,以便在前一个异步操作运行时删除元素,该运算符是从

这个

问题。

似乎Rx库并不能令人满意地处理这种混乱的业务,因为它只是

省略对其创建的CancellationTokenSources的处置

..。

票数 3
EN

Stack Overflow用户

发布于 2020-11-12 19:48:44

下面是您需要的查询:

代码语言:javascript
复制
var subject = new Subject();
var delay = TimeSpan.FromSeconds(1.0);

IObservable query =
    subject
        .StartWith(Unit.Default)
        .Select(x => Observable.Timer(TimeSpan.Zero, delay))
        .Switch()
        .SelectMany(x => Observable.FromAsync(() => GetSomeData()));

如果任何时候你打电话给

它会立即触发对

并在随后基于

设置在

..。

使用

将在有订阅者的情况下立即设置查询。

使用

方法取消任何挂起的操作。

被称为。

这应该与你的大理石图相匹配。

上面的版本没有引入值之间的延迟。

版本2应该。

代码语言:javascript
复制
var subject = new Subject();
var delay = TimeSpan.FromSeconds(5.0);

var source = Observable.FromAsync(() => GetSomeData());

IObservable query =
    subject
        .StartWith(Unit.Default)
        .Select(x => source.Expand(n => Observable.Timer(delay).SelectMany(y => source)))
        .Switch();

我使用了

运算符在值之间引入延迟。只要

仅生成单个值(该值

确实如此),这应该可以很好地工作。

票数 0
EN

Stack Overflow用户

发布于 2020-11-14 10:34:45

我建议不要尝试取消已经开始的呼叫。事情会变得太混乱。如果GetSomeValueAsync中的逻辑涉及数据库调用和/或web API调用,您根本无法真正取消调用。

我认为这里的关键是确保所有对GetSomeValueAsync的调用都是序列化的。

我创建了下面的解决方案,它是基于英格明斯的版本1。它是在asp.net核心3.1上的一个webassembly blazor页面上测试的,运行良好。

代码语言:javascript
复制
private int _ticks = 0; //simulate a resource you want serialized access

//for manual event, trigger will be 0; for Timer event, trigger will be 1,2,3...
protected async Task GetSomeValueAsync(string trigger)
{
    var valueToReturn = $"{DateTime.Now.Ticks.ToString()}: {_ticks.ToString()} | ({trigger})";

    await Task.Delay(1000);
    _ticks += 1;
    return valueToReturn;
}

//define two subjects
private Subject _testSubject = new Subject();
private Subject _getDataSubject = new Subject();

//driving observable, based on Enigmativity's Version 1
var delay = TimeSpan.FromSeconds(3.0);
IObservable getDataObservable =
    _testSubject
   .StartWith("Init")
   .Select(x => Observable.Timer(TimeSpan.Zero, delay).Select(i => i.ToString()))
   .Switch()
   .WithLatestFrom(_getDataSubject.AsObservable().StartWith("IDLE"))
   .Where(a => a.Second == "IDLE")
   .Select(a => a.First);

//_disposables is CompositeDisposable defined in the page
_disposables.Add(getDataObservable.Subscribe(async t =>
{
     _getDataSubject.OnNext("WORKING");
     //_service.LogToConsole is my helper function to log data to console
     await _service.LogToConsole(await GetSomeValueAsync(t)); 
     _getDataSubject.OnNext("IDLE");
}));

就是这样。我使用了一个按钮来触发手动事件。The The The

_

输出中的刻度始终是按顺序的,也就是说,没有重叠发生。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64659387

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档