首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Doobie 0.9.0正确使用Monix 3.2.2 Observable

使用Doobie 0.9.0正确使用Monix 3.2.2 Observable
EN

Stack Overflow用户
提问于 2020-08-23 15:00:02
回答 1查看 109关注 0票数 1

我想将Monix Observable与Doobie (fs2)流一起使用,但似乎无法正常工作。没有流媒体,我的测试应用程序可以很好地退出,但在使用流媒体之后,我的TaskApp似乎挂起了,并且找不到原因。

下面是一个重现问题的最小示例:

代码语言:javascript
复制
package example

import java.util.concurrent.Executors

import doobie.implicits._
import cats.effect.{Blocker, ContextShift, ExitCode, Resource}
import doobie.hikari.HikariTransactor
import monix.eval.{Task, TaskApp}
import com.typesafe.scalalogging.StrictLogging
import fs2.interop.reactivestreams._
import monix.reactive.Observable

import scala.concurrent.ExecutionContext
 
object Hello extends TaskApp with StrictLogging {
    
  private def resources()(implicit contextShift: ContextShift[Task]): Resource[Task, Resources] = {
    for {
      transactor <- Database.transactor("org.postgresql.Driver", "jdbc:postgresql://localhost/fubar", "fubar", "fubar")
    } yield Resources(transactor)
  }

  def run(args: List[String]): Task[ExitCode] = resources().use(task)
    .flatMap(_ => Task { println("All Done!") })
    .flatMap(_ => Task(ExitCode.Success))
  
  def task(resources: Resources): Task[Unit] = {

    val publisher =
      
      sql"""select id from message;"""

        .query[(Long)]
        .stream
        .transact(resources.transactor)
        .toUnicastPublisher()

    Observable.fromReactivePublisher(publisher)
      .foreachL(id => logger.info(id.toString))
    
  }
  
}

case class Resources(transactor: HikariTransactor[Task])

object Database {

  val ecBlocking = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8))

  def transactor(dbDriver: String, dbUrl: String, dbUser: String, dbPassword: String)(implicit contextShift: ContextShift[Task]): Resource[Task, HikariTransactor[Task]] = {
    HikariTransactor.newHikariTransactor[Task](dbDriver, dbUrl, dbUser, dbPassword, ecBlocking, Blocker.liftExecutionContext(ecBlocking))
  }

}

我已经根据Monix文档将fs2流转换为Monix observable:https://monix.io/docs/current/reactive/observable.html#fs2

我是否需要以某种方式关闭fs2流或Observable以使应用程序干净地退出?感谢任何让它工作的技巧,或者是如何正确调试它的技巧。

EN

回答 1

Stack Overflow用户

发布于 2020-08-23 23:46:10

问题是ExecutionContext需要关闭。请参阅作者的答案here

正确的用法可以在in the documentation上看到。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63544248

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档