首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用testcontainers测试kafka和spark

使用testcontainers测试kafka和spark
EN

Stack Overflow用户
提问于 2021-08-24 21:51:16
回答 1查看 282关注 0票数 4

我正在尝试检查testcontainers作为集成测试的流水线,但我不知道如何获得bootstrapServers,至少在最新的testcontainers版本中,并在那里创建一个特定的主题。如何使用'containerDef‘来提取引导服务器并添加主题?

代码语言:javascript
复制
import com.dimafeng.testcontainers.{ContainerDef, KafkaContainer}
import com.dimafeng.testcontainers.scalatest.TestContainerForAll
import munit.FunSuite
import org.apache.spark.sql.SparkSession

class Mykafkatest extends FunSuite with TestContainerForAll {
  //val kafkaContainer: KafkaContainer      = KafkaContainer("confluentinc/cp-kafka:5.4.3")
  override val containerDef: ContainerDef = KafkaContainer.Def()

  test("do something")(withContainers { container =>
    val sparkSession: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("Unit testing")
      .getOrCreate()

    // How add a topic in that container?

    // This is not posible:
    val servers=container.bootstrapServers

    val df = sparkSession.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", servers)
      .option("subscribe", "topic1")
      .load()


    df.show(false)

  })

}

我的sbt配置:

代码语言:javascript
复制
lazy val root = project
  .in(file("./pipeline"))
  .settings(
    organization := "org.example",
    name := "spark-stream",
    version := "0.1",
    scalaVersion := "2.12.10",
    libraryDependencies := Seq(
      "org.apache.spark" %% "spark-sql-kafka-0-10"       % "3.0.3"  % Compile,
      "org.apache.spark" %% "spark-sql"                  % "3.0.3"  % Compile,
      "com.dimafeng"     %% "testcontainers-scala-munit" % "0.39.5" % Test,
      "org.dimafeng"     %% "testcontainers-scala-kafka" % "0.39.5" % Test,
      "org.scalameta"    %% "munit"                      % "0.7.28" % Test
    ),
    testFrameworks += new TestFramework("munit.Framework"),
    Test / fork := true
  )

文档没有显示完整的示例:https://www.testcontainers.org/modules/kafka/

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-10-02 14:27:22

这里唯一的问题是您显式地将该KafkaContainer.Def转换为ContainerDef

withContianersContainter提供的容器类型由path dependent type在提供的ContainerDef中决定,

代码语言:javascript
复制
trait TestContainerForAll extends TestContainersForAll { self: Suite =>

  val containerDef: ContainerDef

  final override type Containers = containerDef.Container

  override def startContainers(): containerDef.Container = {
    containerDef.start()
  }

  // inherited from TestContainersSuite
  def withContainers[A](runTest: Containers => A): A = {
    val c = startedContainers.getOrElse(throw IllegalWithContainersCall())
    runTest(c)
  }

}
代码语言:javascript
复制
trait ContainerDef {

  type Container <: Startable with Stoppable

  protected def createContainer(): Container

  def start(): Container = {
    val container = createContainer()
    container.start()
    container
  }
}

override val containerDef: ContainerDef = KafkaContainer.Def()中显式指定类型ContainerDef的那一刻,这就打破了整个“类型把戏”,因此Scala编译器只剩下一个type Container <: Startable with Stoppable而不是KafkaContainer

因此,只需删除转换为ContainerDef显式类型,val servers = container.bootstrapServers就会按预期工作。

代码语言:javascript
复制
import com.dimafeng.testcontainers.KafkaContainer
import com.dimafeng.testcontainers.munit.TestContainerForAll
import munit.FunSuite

class Mykafkatest extends FunSuite with TestContainerForAll {
  override val containerDef = KafkaContainer.Def()

  test("do something")(withContainers { container =>
    //...

    val servers = container.bootstrapServers

    println(servers)

    //...
  })
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68914485

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档