首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >RxSwift。依次执行单独的可观察性

RxSwift。依次执行单独的可观察性
EN

Stack Overflow用户
提问于 2018-09-28 10:29:18
回答 3查看 3.9K关注 0票数 6

我正试图实现我的可观察性,只有在以前的可观测性已经完成时才能执行。我不能使用flatMap,因为订阅可以从不同的地方调用,而且这些可观察到的内容并不是相互连接的。具体来说:我的CollectionView从服务器加载更多的内容,在用户单击“发送评论”按钮2秒后,CollectionView仍在加载它的批处理。因此,我想等到CollectionView更新完成,然后执行我的评论的发布请求。我创建了一个名为ObservableQueue的类,它运行得很好。但我需要知道它是否存在内存泄漏、死锁等问题,或者我只是漏掉了什么。下面是:

代码语言:javascript
复制
extension CompositeDisposable {

    @discardableResult
    func insert(disposeAction: @escaping () -> ()) -> DisposeKey? {
        return insert(Disposables.create(with: disposeAction))
    }

}

class ObservableQueue {

    private let lock = NSRecursiveLock()
    private let relay = BehaviorRelay(value: 0)
    private let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "ObservableQueue.scheduler")

    func enqueue<T>(_ observable: Observable<T>) -> Observable<T> {
        return Observable.create({ observer -> Disposable in
            let disposable = CompositeDisposable()

            let relayDisposable = self
                .relay
                .observeOn(self.scheduler)
                .filter({ value -> Bool in
                    if value > 0 {
                        return false
                    }

                    self.lock.lock(); defer { self.lock.unlock() }

                    if self.relay.value > 0 {
                        return false
                    }

                    self.relay.accept(self.relay.value + 1)

                    disposable.insert {
                        self.lock.lock(); defer { self.lock.unlock() }
                        self.relay.accept(self.relay.value - 1)
                    }

                    return true
                })
                .take(1)
                .flatMapLatest { _ in observable }
                .subscribe { observer.on($0) }

            _ = disposable.insert(relayDisposable)

            return disposable
        })
    }

}

然后我就可以这样用了:

代码语言:javascript
复制
let queue = ObservableQueue()

...

// first observable
let observable1 = Observable
    .just(0)
    .delay(5, scheduler: MainScheduler.instance)

queue
    .enqueue(observable1)
    .subscribe(onNext: { _ in
        print("here1")
     })
    .disposed(by: rx.disposeBag)

// second observable
let observable2 = Observable
    .just(0)
    .delay(5, scheduler: MainScheduler.instance)

queue
    .enqueue(observable2)
    .subscribe(onNext: { _ in
        print("here2")
    })
    .disposed(by: rx.disposeBag)

// third observable
let observable3 = Observable
    .just(0)
    .delay(5, scheduler: MainScheduler.instance)

queue
    .enqueue(observable3)
    .subscribe(onNext: { _ in
        print("here3")
    })
    .disposed(by: rx.disposeBag)
EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2018-09-29 00:47:19

CLGeocoder也有同样的问题。根据文档,在处理之前的请求时,您不能调用其中一个Geo编码器方法,这与您想要做的非常类似。在这个gist (https://gist.github.com/danielt1263/64bda2a32c18b8c28e1e22085a05df5a)中,您会发现我在后台线程上进行了可观察到的调用,并使用信号量保护作业。这是关键,你需要信号量,而不是锁。

像这样的东西应该对你有用:

代码语言:javascript
复制
class ObservableQueue {

    private let semaphore = DispatchSemaphore(value: 1)
    private let scheduler = ConcurrentDispatchQueueScheduler(qos: .userInitiated)

    func enqueue<T>(_ observable: Observable<T>) -> Observable<T> {
        let _semaphore = semaphore // To avoid the use of self in the block below
        return Observable.create { observer in
            _semaphore.wait()
            let disposable = observable.subscribe { event in
                switch event {
                case .next:
                    observer.on(event)
                case .error, .completed:
                    observer.on(event)
                }
            }
            return Disposables.create {
                disposable.dispose()
                _semaphore.signal()
            }
        }
        .subscribeOn(scheduler)
    }
}
票数 7
EN

Stack Overflow用户

发布于 2018-09-28 10:47:20

我会给你一些建议,我认为这将在未来帮助你。

  1. 尽量避免使用Observable.create,这是一个可观察到的“蛮力”,它根本无法处理背压,你必须自己实现它,这不是一件容易的事情。
  2. 通常,对于HTTP调用,您不需要观察到,您应该使用SingleCompletable,因为您只期望从您的服务器得到一个响应,而不是一个响应流。
  3. 您应该小心使用strong self中的onNext/on...,作为经验规则,如果订阅观察者的类有dispose包,则应该使用weak self

现在,对于您的特殊情况,如果只需要这对观察者(获取和发送注释),我认为队列有点过火了。您只需调用"fetch“观察者的do(onNext:)方法上的post注释观察者(如果可用的话)。每次触发"onNext“事件时都会调用Do on next。

如果您仍然需要一个队列,我将使用一个OperationQueue,它只对操作进行队列,并且有一个类似于observeOperationchanges() -> Observeble<Operation>的方法--这将在每次操作完成时触发。通过这种方式,您只订阅一次并多次排队,但这可能不适合您的需要。

票数 0
EN

Stack Overflow用户

发布于 2018-09-28 11:41:29

我将使用.combineLatest()来生成一个事件,一旦两个可观察到的结果都发出了某种信息。请参阅http://rxmarbles.com/#combineLatest

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52553397

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档