这可能是一个微不足道的问题,但我无法找到这个看似简单的任务的解决方案。由于我刚开始接触ReactiveSwift和反应性编程,我可能会忽略一些显而易见的东西。
基本上我想做的是这样的事情:
signal.collect(timeInterval: .seconds(5))我想从一个信号中收集一段特定时间内的所有值。产生的信号将每x秒产生一个事件,其中包含从第一个信号收集的事件数组。
在ReactiveSwift中做这件事的最佳方法是什么?
发布于 2017-03-12 11:03:03
ReactiveSwift中没有用于此任务的内置操作符。相反,您可以使用以下方法,编写扩展:
import Foundation
import ReactiveSwift
import Result
public extension Signal {
public func completeAfter(after: TimeInterval, onScheduler : DateSchedulerProtocol = QueueScheduler() ) -> Signal {
let pipe : (Signal<(), NoError>, ReactiveSwift.Observer<(), NoError>) = Signal<(), NoError>.pipe()
onScheduler.schedule(after: Date(timeIntervalSinceNow: after)) {
pipe.1.sendCompleted()
}
return Signal { observer in
return self.observe { event in
switch event {
case let .value(value):
observer.send(value: value)
case .completed:
observer.sendCompleted()
case let .failed(error):
observer.send(error: error)
case .interrupted:
observer.sendInterrupted()
}
}
}.take(until: pipe.0)
}
public func collectUntil(until: TimeInterval) -> Signal<[Value], Error> {
return self.completeAfter(after: until).collect()
}
}然后使用signal.collectUntil(5)方法。
另一种方法是从timer中使用ReactiveSwift函数。示例(添加相同的扩展,如上面所示):
public func collectUntil2(until: TimeInterval) -> Signal<[Value], Error> {
var signal: Signal<(), NoError>? = nil
timer(interval: until, on: QueueScheduler()).startWithSignal { innerSignal, _ in
signal = innerSignal.map { _ in () }.take(first: 1)
}
return self.take(until: signal!).collect()
}但是,我不喜欢这种方法,因为它是SignalProducer类型提取内部信号的伪造性质。
Signal类型本身也有timeout函数,但是由于它会产生错误,所以很难使用它。如何使用它的示例(仍然,添加到相同的扩展):
public func completeOnError() -> Signal<Value, Error> {
return Signal { observer in
return self.observe { event in
switch(event) {
case .value(let v): observer.send(value: v)
case .failed(_): observer.sendCompleted()
case .interrupted: observer.sendInterrupted()
case .completed: observer.sendCompleted()
}
}
}
}
public func collectUntil3(until: TimeInterval) -> Signal<[Value], Error> {
return self
.timeout(after: until,
raising: NSError() as! Error,
on: QueueScheduler())
.completeOnError()
.collect()
}P.S.通过选择三个选项中的任何一个,考虑通过正确的调度程序或使用正确的调度器对您的解决方案进行顺行。
发布于 2017-03-14 10:37:05
基于answer by Petro Korienev (遗憾的是,这并不是我想要的),我创建了一个扩展来解决我的问题。扩展遵循ReactiveSwift collect函数的结构,以尽可能接近ReactiveSwift的意图。
它将收集在给定timeInterval上发送的所有值,然后将它们作为数组发送。在终止事件中,如果有其他值,它也会发送剩余值。
extension Signal {
func collect(timeInterval: DispatchTimeInterval,
on scheduler: QueueScheduler = QueueScheduler()) -> Signal<[Value], Error> {
return Signal<[Value], Error> { observer in
var values: [Value] = []
let sendAction: () -> Void = {
observer.send(value: values)
values.removeAll(keepingCapacity: true)
}
let disposable = CompositeDisposable()
let scheduleDisposable = scheduler.schedule(
after: Date(timeInterval: timeInterval.timeInterval, since: scheduler.currentDate),
interval: timeInterval,
action: sendAction
)
disposable += scheduleDisposable
disposable += self.observe { (event: Event<Value, Error>) in
if event.isTerminating {
if !values.isEmpty {
sendAction()
}
scheduleDisposable?.dispose()
}
switch event {
case let .value(value):
values.append(value)
case .completed:
observer.sendCompleted()
case let .failed(error):
observer.send(error: error)
case .interrupted:
observer.sendInterrupted()
}
}
return disposable
}
}
}
extension SignalProducer {
func collect(timeInterval: DispatchTimeInterval,
on scheduler: QueueScheduler = QueueScheduler()) -> SignalProducer<[Value], Error> {
return lift { (signal: ProducedSignal) in
signal.collect(timeInterval: timeInterval, on: scheduler)
}
}
}
extension DispatchTimeInterval {
var timeInterval: TimeInterval {
switch self {
case let .seconds(s):
return TimeInterval(s)
case let .milliseconds(ms):
return TimeInterval(TimeInterval(ms) / 1000.0)
case let .microseconds(us):
return TimeInterval(UInt64(us) * NSEC_PER_USEC) / TimeInterval(NSEC_PER_SEC)
case let .nanoseconds(ns):
return TimeInterval(ns) / TimeInterval(NSEC_PER_SEC)
}
}
}https://stackoverflow.com/questions/42694045
复制相似问题