首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用reactor订阅实现smartLifeCycle

使用reactor订阅实现smartLifeCycle
EN

Stack Overflow用户
提问于 2020-01-14 06:47:11
回答 1查看 189关注 0票数 0

下面是我为一个组件编写的代码,它启动一个Flux并订阅它,所有这些都在该类的构造函数中。这个特殊的流量来自一个mongoChangeStreams调用。除非出现错误,否则它不会终止。

我希望订阅始终保持活动状态,因此我在由于错误而终止的事件中重新启动了订阅。

我想到在构造函数中调用subscribe可能不是一个好主意。此外,我可能应该启用一种方法来优雅地关闭此应用程序,在关闭期间调用订阅上的取消。

我的猜测是我应该实现SmartLifeCycle,但是我不确定该怎么做。有没有在Flux订阅支持的组件上实现SmartLifeCycle的标准方法?

代码语言:javascript
复制
@Component
class SubscriptionManager(
    private val fooFluxProvider: FooFluxProvider, //calling foos() on this returns a Flux of foos
    private val fooProcessor: FooProcessor
)  {

    private var subscription: BaseSubscriber<Foo> = subscribe() //called in constructor

    private fun subscribe() = buildSubscriber().also {
        fooFluxProvider.foos().subscribe(it)
    }

    private fun buildSubscriber(): BaseSubscriber<Foo> {
        return object : BaseSubscriber<Foo>() {
            override fun hookOnSubscribe(subscription: Subscription) {

                subscription.request(1)
            }

            override fun hookOnNext(value: Foo) {
                //process the foo
                fooProcessor.process(value)//sync call
                //ask for another foo
                request(1)
            }

            override fun hookOnError(throwable: Throwable) {
                logger.error("Something went wrong, restarting subscription", throwable)
                //restart the subscription. We'll recover if we're lucky
               subscription = subscribe()
            }
        }
    }




}
EN

回答 1

Stack Overflow用户

发布于 2020-01-23 11:38:35

  1. 不是在异常时创建重新订阅的Subscriber子类,而是在订阅之前在Flux上链接一个retry*操作符。如果上游Flux完成时出现异常,重试操作符将重新订阅上游Flux。例如,fooFluxProvider.foos().retry()将无限期重试。对于更高级的行为,还有retry*的其他变体,包括可与reactor-extra.
  2. Instead中的reactor.retry.Retry类一起使用的极其可定制的retryWhen,将subscriber传递给subscribe(subscriber),调用返回Disposablesubscribe方法之一。这为您提供了一个对象,稍后您可以在关机期间对其调用dispose()以取消成员实现SmartLifecycle:在构造函数中(或在start()中)
    • ,创建Flux (但不要在constructor)
    • In start()中订阅它),调用flux.subscribe()并将返回的Disposable保存到成员字段。与构造函数相比,start()方法更适合于启动后台作业。如果您希望在后台运行,还可以考虑在.subscribe()之前链接.subscribeOn(Scheduler) (默认情况下,订阅发生在调用subscribe的线程上)。调用disposable.dispose()

  • In stop()

可能是这样的:

代码语言:javascript
复制
class SubscriptionManager(
        fooFluxProvider: FooFluxProvider, //calling foos() on this returns a Flux of foos
        fooProcessor: FooProcessor
) : SmartLifecycle {
    private val logger = LoggerFactory.getLogger(javaClass)

    private val fooFlux = fooFluxProvider.foos()
            // Subscribe on a parallel scheduler to run in the background
            .subscribeOn(Schedulers.parallel())
            // Publish on a boundedElastic scheduler if fooProcessor.process blocks
            .publishOn(Schedulers.boundedElastic())
            // Use .doOnNext to send the foo to your processor
            // Alternatively use .flatMap/.concatMap/.flatMapSequential if the processor returns a Publisher
            // Alternatively use .map if the processor transforms the foo, and you need to operate on the returned value
            .doOnNext(fooProcessor::process)
            // Log if an exception occurred
            .doOnError{ e -> logger.error("Something went wrong, restarting subscription", e) }
            // Resubscribe if an exception occurred
            .retry()
            // Repeat if you want to resubscribe if the upstream flux ever completes successfully
            .repeat()

    private var disposable: Disposable? = null

    @Synchronized
    override fun start() {
        if (!isRunning) {
            disposable = fooFlux.subscribe()
        }
    }

    @Synchronized
    override fun stop() {
        disposable?.dispose()
        disposable = null
    }

    @Synchronized
    override fun isRunning(): Boolean {
        return disposable != null
    }

}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59725296

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档