首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >ZIO和多个回调

ZIO和多个回调
EN

Stack Overflow用户
提问于 2021-05-04 01:50:53
回答 1查看 123关注 0票数 0

我对使用ZIO非常陌生。我目前正在用Scala编写一个密码交易机器人,同时我也在尝试学习ZIO。现在我正在打开一个websocket,这个websocket会给出多个回调,直到它被关闭,我正在努力将其集成到我的代码中。我当前的代码:

代码语言:javascript
复制
object Main extends zio.App with Logging {
   def run(args: List[String]): URIO[Any with Console, ExitCode] = Configuration.getConfiguration.fold(onError, start).exitCode

   private val interval: CandlestickInterval = CandlestickInterval.ONE_MINUTE

   private def onError(exception: ConfigurationException): ZIO[Any, Throwable, Unit]  = {
     logger.info("Could not initialize traderbot!")
     logger.error(exception.getMessage)
     IO.succeed()
   }

   private final def start(configuration: Configuration): ZIO[Any, Throwable, Unit] = {
      for {
        binanceClient <- IO.succeed(BinanceApiClientFactory.newInstance(configuration.apiKey, configuration.secret))
        webSocketClient <- IO.succeed(binanceClient.newWebSocketClient())
        candlesticks <- Task.effectAsync[CandlestickEvent] {
          callback =>
            webSocketClient.onCandlestickEvent(
            "adaeur",
            interval, d => callback(IO.succeed(d))
          )
        })
        // TODO Calculate RSI from candlesticks.
   } yield candlesticks
 }
}

我想继续接收烛台事件,并保持功能正常。我看到了一些关于Zio Streams的东西,但我找不到处理重复回调的示例,而且很容易理解。现在我不能使用我的Candlestick代码来进行理解。

耽误您时间,实在对不起!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-05-09 21:48:50

不幸的是,在使用effectAsync时,ZIO不能处理多个回调,因为数据类型是基于单个成功或失败值的。

不过,您可以使用ZStream,它有一个类似形状的运算符,可以多次调用:

代码语言:javascript
复制
private final def start(configuration: Configuration): ZStream[Any, Throwable, Unit] = {
  val candlesticks = ZStream.unwrap(
    IO.effectTotal {
      val client = BinanceApiClientFactory
        .newInstance(configuration.apiKey, configuration.secret)
        .newWebSocketClient()

      // This variant accepts a return value in the `Left` which 
      // is called when during shutdown to make sure that the websocket is 
      // cleaned up
      ZStream.effectAsyncInterrupt { cb => 
        val closeable = webSocketClient.onCancelstickEvent(
          "adaeur",
          interval,
          d => cb(IO.succeed(d)
        ) 

        Left(UIO(closeable.close()))
      }
  )

  for {
    candlestick <- candlesticks
    // TODO Calculate RSI from candlesticks.
  } yield ()
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67373533

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档