首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >将ConnectableFlux用于热流REST端点

将ConnectableFlux用于热流REST端点
EN

Stack Overflow用户
提问于 2021-08-24 10:02:05
回答 1查看 45关注 0票数 0

我正在尝试创建一个REST端点,以便使用Reactor订阅热流。

我的流的测试提供程序如下所示:

代码语言:javascript
复制
@Service
class TestProvider {
  fun getStream(resourceId: String): ConnectableFlux<QData> {
    return Flux.create<QData> {
      for (i in 1..10) {
        it.next(QData(LocalDateTime.now().toString(), "next"))
        Thread.sleep(500L)
      }
      it.complete()
    }.publish()
  }
}

我的REST端点服务如下所示:

代码语言:javascript
复制
@Service
class DataService @Autowired constructor(
  private val prv: TestProvider
) {

  private val streams = mutableMapOf<String, ConnectableFlux<QData>>()

  fun subscribe(resourceId: String): Flux<QData> {
    val stream = getStream(resourceId)
    return Flux.push { flux ->
      stream.subscribe{
        flux.next(it)
      }
      flux.complete()
    }
  }

  private fun getStream(resourceId: String): ConnectableFlux<QData> {
    if(streams.containsKey(resourceId).not()) {
      streams.put(resourceId, createStream(resourceId))
    }
    return streams.get(resourceId)!!
  }

  private fun createStream(resourceId: String): ConnectableFlux<QData> {
    val stream = prv.getStream(resourceId)
    stream.connect()
    return stream
  }
}

控制器看起来像这样:

代码语言:javascript
复制
@RestController
class DataController @Autowired constructor(
  private val dataService: DataService
): DataApi {

  override fun subscribe(resourceId: String): Flux<QData> {
    return dataService.subscribe(resourceId)
  }
}

API接口如下所示:

代码语言:javascript
复制
interface DataApi {

  @ApiResponses(value = [
    ApiResponse(responseCode = "202", description = "Client is subscribed", content = [
      Content(mediaType = "application/json", array = ArraySchema(schema = Schema(implementation = QData::class)))
    ])
  ])
  @GetMapping(path = ["/subscription/{resourceId}"], produces = [MediaType.APPLICATION_JSON_VALUE])
  fun subscribe(
    @Parameter(description = "The resource id for which quality data is subscribed for", required = true, example = "example",allowEmptyValue = false)
    @PathVariable("resourceId", required = true) @NotEmpty resourceId: String
  ): Flux<QData>
}

不幸的是,我的curl提供了一个空数组。

有谁知道问题出在哪里吗?提前感谢!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-08-24 16:04:26

我不得不在DataService中运行connect()异步

代码语言:javascript
复制
    CompletableFuture.runAsync {
      stream.connect()
    }
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68905552

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档