我有一些像这样的反应堆流量:
SomeFlux.map(thing -> makeOtherThing(thing))
.publish(fluxFunction)
.subscribe();其中,.publish()的参数是fluxFunction,它是Java8实现的对象
如下所示的Function<T, R>函数接口:
public class FluxFunction implements Function<Flux<OtherThing>, Flux<DiffOtherThing>> {
@Override
Flux<DiffOtherThing> apply(Flux<OtherThing>) {
// some code which "consumes" incoming Flux and outputs another
}
}我的问题是,如果我将上面的代码更改为:
SomeFlux.parallel()
.runOn(Schedulers.parrallel())
.map(thing -> makeOtherThing(thing))
.publish(fluxFunction)
.subscribe();.publish(fluxFunction)方法无效,因为我现在处理的是ParallelFlux,有没有可以在parallelFlux上使用的.publish(Function<>)的等价物?如果是这样的话,我如何编辑我的函数实现才能适应这种情况呢?
发布于 2021-07-17 04:42:02
因此,看起来等同的(至少在我的例子中)是:
SomeFlux.parallel()
.runOn(Schedulers.parallel())
.map(thing -> makeOtherThing(thing))
.as(fluxFunction) // changed ".publish()" to ".as()"
.subscribe()其中我还将fluxFunction重新定义为:
// Now operates on ParallelFlux<> rather than Flux<>
public class FluxFunction implements Function<ParallelFlux<OtherThing>, ParallelFlux<DiffOtherThing>> {
@Override
ParallelFlux<DiffOtherThing> apply(ParallelFlux<OtherThing>) {
// some code which "consumes" incoming ParallelFlux and outputs another
}
}这对我来说似乎是完美的。
我希望这能帮助任何发现自己处于类似困境的人。
https://stackoverflow.com/questions/68411801
复制相似问题