首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >信号:随时间间隔收集值

信号:随时间间隔收集值
EN

Stack Overflow用户
提问于 2017-03-09 11:20:29
回答 2查看 1.3K关注 0票数 4

这可能是一个微不足道的问题,但我无法找到这个看似简单的任务的解决方案。由于我刚开始接触ReactiveSwift和反应性编程,我可能会忽略一些显而易见的东西。

基本上我想做的是这样的事情:

代码语言:javascript
复制
signal.collect(timeInterval: .seconds(5))

我想从一个信号中收集一段特定时间内的所有值。产生的信号将每x秒产生一个事件,其中包含从第一个信号收集的事件数组。

在ReactiveSwift中做这件事的最佳方法是什么?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-03-12 11:03:03

ReactiveSwift中没有用于此任务的内置操作符。相反,您可以使用以下方法,编写扩展:

代码语言:javascript
复制
import Foundation
import ReactiveSwift
import Result
public extension Signal {
    public func completeAfter(after: TimeInterval, onScheduler : DateSchedulerProtocol = QueueScheduler() ) -> Signal {
        let pipe : (Signal<(), NoError>, ReactiveSwift.Observer<(), NoError>) = Signal<(), NoError>.pipe()
        onScheduler.schedule(after: Date(timeIntervalSinceNow: after)) {
            pipe.1.sendCompleted()
        }
        return Signal { observer in
            return self.observe { event in
                switch event {
                case let .value(value):
                    observer.send(value: value)
                case .completed:
                    observer.sendCompleted()
                case let .failed(error):
                    observer.send(error: error)
                case .interrupted:
                    observer.sendInterrupted()
                }
            }
        }.take(until: pipe.0)
    }

    public func collectUntil(until: TimeInterval) -> Signal<[Value], Error> {
        return self.completeAfter(after: until).collect()
    }
}

然后使用signal.collectUntil(5)方法。

另一种方法是从timer中使用ReactiveSwift函数。示例(添加相同的扩展,如上面所示):

代码语言:javascript
复制
public func collectUntil2(until: TimeInterval) -> Signal<[Value], Error> {
    var signal: Signal<(), NoError>? = nil
    timer(interval: until, on: QueueScheduler()).startWithSignal { innerSignal, _ in
        signal = innerSignal.map { _ in () }.take(first: 1)
    }
    return self.take(until: signal!).collect()
}

但是,我不喜欢这种方法,因为它是SignalProducer类型提取内部信号的伪造性质。

Signal类型本身也有timeout函数,但是由于它会产生错误,所以很难使用它。如何使用它的示例(仍然,添加到相同的扩展):

代码语言:javascript
复制
public func completeOnError() -> Signal<Value, Error> {
    return Signal { observer in
        return self.observe { event in
            switch(event) {
            case .value(let v): observer.send(value: v)
            case .failed(_): observer.sendCompleted()
            case .interrupted: observer.sendInterrupted()
            case .completed: observer.sendCompleted()
            }
        }
    }
}

public func collectUntil3(until: TimeInterval) -> Signal<[Value], Error> {
    return self
        .timeout(after: until,
                 raising: NSError() as! Error,
                 on: QueueScheduler())
        .completeOnError()
        .collect()
}

P.S.通过选择三个选项中的任何一个,考虑通过正确的调度程序或使用正确的调度器对您的解决方案进行顺行。

票数 4
EN

Stack Overflow用户

发布于 2017-03-14 10:37:05

基于answer by Petro Korienev (遗憾的是,这并不是我想要的),我创建了一个扩展来解决我的问题。扩展遵循ReactiveSwift collect函数的结构,以尽可能接近ReactiveSwift的意图。

它将收集在给定timeInterval上发送的所有值,然后将它们作为数组发送。在终止事件中,如果有其他值,它也会发送剩余值。

代码语言:javascript
复制
extension Signal {
    func collect(timeInterval: DispatchTimeInterval,
                 on scheduler: QueueScheduler = QueueScheduler()) -> Signal<[Value], Error> {
        return Signal<[Value], Error> { observer in
            var values: [Value] = []
            let sendAction: () -> Void = {
                observer.send(value: values)

                values.removeAll(keepingCapacity: true)
            }
            let disposable = CompositeDisposable()
            let scheduleDisposable = scheduler.schedule(
                    after: Date(timeInterval: timeInterval.timeInterval, since: scheduler.currentDate),
                    interval: timeInterval,
                    action: sendAction
            )

            disposable += scheduleDisposable
            disposable += self.observe { (event: Event<Value, Error>) in
                if event.isTerminating {
                    if !values.isEmpty {
                        sendAction()
                    }

                    scheduleDisposable?.dispose()
                }

                switch event {
                case let .value(value):
                    values.append(value)
                case .completed:
                    observer.sendCompleted()
                case let .failed(error):
                    observer.send(error: error)
                case .interrupted:
                    observer.sendInterrupted()
                }
            }

            return disposable
        }
    }
}

extension SignalProducer {
    func collect(timeInterval: DispatchTimeInterval,
                 on scheduler: QueueScheduler = QueueScheduler()) -> SignalProducer<[Value], Error> {
        return lift { (signal: ProducedSignal) in
            signal.collect(timeInterval: timeInterval, on: scheduler)
        }
    }
}

extension DispatchTimeInterval {
    var timeInterval: TimeInterval {
        switch self {
        case let .seconds(s):
            return TimeInterval(s)
        case let .milliseconds(ms):
            return TimeInterval(TimeInterval(ms) / 1000.0)
        case let .microseconds(us):
            return TimeInterval(UInt64(us) * NSEC_PER_USEC) / TimeInterval(NSEC_PER_SEC)
        case let .nanoseconds(ns):
            return TimeInterval(ns) / TimeInterval(NSEC_PER_SEC)
        }
    }
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42694045

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档