我正在尝试检查testcontainers作为集成测试的流水线,但我不知道如何获得bootstrapServers,至少在最新的testcontainers版本中,并在那里创建一个特定的主题。如何使用'containerDef‘来提取引导服务器并添加主题?
import com.dimafeng.testcontainers.{ContainerDef, KafkaContainer}
import com.dimafeng.testcontainers.scalatest.TestContainerForAll
import munit.FunSuite
import org.apache.spark.sql.SparkSession
class Mykafkatest extends FunSuite with TestContainerForAll {
//val kafkaContainer: KafkaContainer = KafkaContainer("confluentinc/cp-kafka:5.4.3")
override val containerDef: ContainerDef = KafkaContainer.Def()
test("do something")(withContainers { container =>
val sparkSession: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("Unit testing")
.getOrCreate()
// How add a topic in that container?
// This is not posible:
val servers=container.bootstrapServers
val df = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "topic1")
.load()
df.show(false)
})
}我的sbt配置:
lazy val root = project
.in(file("./pipeline"))
.settings(
organization := "org.example",
name := "spark-stream",
version := "0.1",
scalaVersion := "2.12.10",
libraryDependencies := Seq(
"org.apache.spark" %% "spark-sql-kafka-0-10" % "3.0.3" % Compile,
"org.apache.spark" %% "spark-sql" % "3.0.3" % Compile,
"com.dimafeng" %% "testcontainers-scala-munit" % "0.39.5" % Test,
"org.dimafeng" %% "testcontainers-scala-kafka" % "0.39.5" % Test,
"org.scalameta" %% "munit" % "0.7.28" % Test
),
testFrameworks += new TestFramework("munit.Framework"),
Test / fork := true
)文档没有显示完整的示例:https://www.testcontainers.org/modules/kafka/
发布于 2021-10-02 14:27:22
这里唯一的问题是您显式地将该KafkaContainer.Def转换为ContainerDef。
withContianers、Containter提供的容器类型由path dependent type在提供的ContainerDef中决定,
trait TestContainerForAll extends TestContainersForAll { self: Suite =>
val containerDef: ContainerDef
final override type Containers = containerDef.Container
override def startContainers(): containerDef.Container = {
containerDef.start()
}
// inherited from TestContainersSuite
def withContainers[A](runTest: Containers => A): A = {
val c = startedContainers.getOrElse(throw IllegalWithContainersCall())
runTest(c)
}
}trait ContainerDef {
type Container <: Startable with Stoppable
protected def createContainer(): Container
def start(): Container = {
val container = createContainer()
container.start()
container
}
}在override val containerDef: ContainerDef = KafkaContainer.Def()中显式指定类型ContainerDef的那一刻,这就打破了整个“类型把戏”,因此Scala编译器只剩下一个type Container <: Startable with Stoppable而不是KafkaContainer。
因此,只需删除转换为ContainerDef显式类型,val servers = container.bootstrapServers就会按预期工作。
import com.dimafeng.testcontainers.KafkaContainer
import com.dimafeng.testcontainers.munit.TestContainerForAll
import munit.FunSuite
class Mykafkatest extends FunSuite with TestContainerForAll {
override val containerDef = KafkaContainer.Def()
test("do something")(withContainers { container =>
//...
val servers = container.bootstrapServers
println(servers)
//...
})
}https://stackoverflow.com/questions/68914485
复制相似问题