首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >相当于ParallelFlux<T>反应堆3的.publish(Function<T,R>)

相当于ParallelFlux<T>反应堆3的.publish(Function<T,R>)
EN

Stack Overflow用户
提问于 2021-07-16 23:42:28
回答 1查看 24关注 0票数 0

我有一些像这样的反应堆流量:

代码语言:javascript
复制
SomeFlux.map(thing -> makeOtherThing(thing))
        .publish(fluxFunction)
        .subscribe();

其中,.publish()的参数是fluxFunction,它是Java8实现的对象

如下所示的Function<T, R>函数接口:

代码语言:javascript
复制
public class FluxFunction implements Function<Flux<OtherThing>, Flux<DiffOtherThing>> {
  @Override
  Flux<DiffOtherThing> apply(Flux<OtherThing>) {
    // some code which "consumes" incoming Flux and outputs another
  }
}

我的问题是,如果我将上面的代码更改为:

代码语言:javascript
复制
SomeFlux.parallel()
        .runOn(Schedulers.parrallel())
        .map(thing -> makeOtherThing(thing))
        .publish(fluxFunction)
        .subscribe();

.publish(fluxFunction)方法无效,因为我现在处理的是ParallelFlux,有没有可以在parallelFlux上使用的.publish(Function<>)的等价物?如果是这样的话,我如何编辑我的函数实现才能适应这种情况呢?

EN

回答 1

Stack Overflow用户

发布于 2021-07-17 04:42:02

因此,看起来等同的(至少在我的例子中)是:

代码语言:javascript
复制
SomeFlux.parallel()
        .runOn(Schedulers.parallel())
        .map(thing -> makeOtherThing(thing))
        .as(fluxFunction) // changed ".publish()" to ".as()" 
        .subscribe()

其中我还将fluxFunction重新定义为:

代码语言:javascript
复制
// Now operates on ParallelFlux<> rather than Flux<>
public class FluxFunction implements Function<ParallelFlux<OtherThing>, ParallelFlux<DiffOtherThing>> {
  @Override
  ParallelFlux<DiffOtherThing> apply(ParallelFlux<OtherThing>) {
    // some code which "consumes" incoming ParallelFlux and outputs another
  }
}

这对我来说似乎是完美的。

我希望这能帮助任何发现自己处于类似困境的人。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68411801

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档