下面是我为一个组件编写的代码,它启动一个Flux并订阅它,所有这些都在该类的构造函数中。这个特殊的流量来自一个mongoChangeStreams调用。除非出现错误,否则它不会终止。
我希望订阅始终保持活动状态,因此我在由于错误而终止的事件中重新启动了订阅。
我想到在构造函数中调用subscribe可能不是一个好主意。此外,我可能应该启用一种方法来优雅地关闭此应用程序,在关闭期间调用订阅上的取消。
我的猜测是我应该实现SmartLifeCycle,但是我不确定该怎么做。有没有在Flux订阅支持的组件上实现SmartLifeCycle的标准方法?
@Component
class SubscriptionManager(
private val fooFluxProvider: FooFluxProvider, //calling foos() on this returns a Flux of foos
private val fooProcessor: FooProcessor
) {
private var subscription: BaseSubscriber<Foo> = subscribe() //called in constructor
private fun subscribe() = buildSubscriber().also {
fooFluxProvider.foos().subscribe(it)
}
private fun buildSubscriber(): BaseSubscriber<Foo> {
return object : BaseSubscriber<Foo>() {
override fun hookOnSubscribe(subscription: Subscription) {
subscription.request(1)
}
override fun hookOnNext(value: Foo) {
//process the foo
fooProcessor.process(value)//sync call
//ask for another foo
request(1)
}
override fun hookOnError(throwable: Throwable) {
logger.error("Something went wrong, restarting subscription", throwable)
//restart the subscription. We'll recover if we're lucky
subscription = subscribe()
}
}
}
}发布于 2020-01-23 11:38:35
retry*操作符。如果上游Flux完成时出现异常,重试操作符将重新订阅上游Flux。例如,fooFluxProvider.foos().retry()将无限期重试。对于更高级的行为,还有retry*的其他变体,包括可与reactor-extra.reactor.retry.Retry类一起使用的极其可定制的retryWhen,将subscriber传递给subscribe(subscriber),调用返回Disposable的subscribe方法之一。这为您提供了一个对象,稍后您可以在关机期间对其调用dispose()以取消成员实现SmartLifecycle:在构造函数中(或在start()中)Flux (但不要在constructor)start()中订阅它),调用flux.subscribe()并将返回的Disposable保存到成员字段。与构造函数相比,start()方法更适合于启动后台作业。如果您希望在后台运行,还可以考虑在.subscribe()之前链接.subscribeOn(Scheduler) (默认情况下,订阅发生在调用subscribe的线程上)。调用disposable.dispose(),
stop()
可能是这样的:
class SubscriptionManager(
fooFluxProvider: FooFluxProvider, //calling foos() on this returns a Flux of foos
fooProcessor: FooProcessor
) : SmartLifecycle {
private val logger = LoggerFactory.getLogger(javaClass)
private val fooFlux = fooFluxProvider.foos()
// Subscribe on a parallel scheduler to run in the background
.subscribeOn(Schedulers.parallel())
// Publish on a boundedElastic scheduler if fooProcessor.process blocks
.publishOn(Schedulers.boundedElastic())
// Use .doOnNext to send the foo to your processor
// Alternatively use .flatMap/.concatMap/.flatMapSequential if the processor returns a Publisher
// Alternatively use .map if the processor transforms the foo, and you need to operate on the returned value
.doOnNext(fooProcessor::process)
// Log if an exception occurred
.doOnError{ e -> logger.error("Something went wrong, restarting subscription", e) }
// Resubscribe if an exception occurred
.retry()
// Repeat if you want to resubscribe if the upstream flux ever completes successfully
.repeat()
private var disposable: Disposable? = null
@Synchronized
override fun start() {
if (!isRunning) {
disposable = fooFlux.subscribe()
}
}
@Synchronized
override fun stop() {
disposable?.dispose()
disposable = null
}
@Synchronized
override fun isRunning(): Boolean {
return disposable != null
}
}https://stackoverflow.com/questions/59725296
复制相似问题