我在我的Spark过程中有很多步骤,我已经创建了套件来测试每个步骤的功能,我的步骤的输出就是下一个步骤的输入。
但是,我希望创建一个通过ETL过程运行两次的测试--例如:
原始数据>> ETL_Step_1 >> ETL_Step_2 >> ETL_Step_3 >> Delta Data >> ETL_Step_1 >> ETL_Step_2 >> ETL_Step_3
我从Reporter那里得到了一个错误,因为它不能运行与以前运行的套件同名的套件(例如,我不能运行名为ETL_Step_1的套件两次)。如果复制套件并重命名副本以执行以下操作,问题就会消失:
原始数据>> ETL_Step_1 >> ETL_Step_2 >> ETL_Step_3 >> Delta Data >> ETL_Step_1_2 >> ETL_Step_2_2 >> ETL_Step_3_2
有没有更好的方法来运行我的测试而不重复和重命名测试套件?
发布于 2020-04-06 18:50:14
您可以将所有ETL步骤放入定义(集合)并运行它们。
import org.scalatest.FunSpec
import com.github.mrpowers.spark.fast.tests.DataFrameComparer
import com.github.mrpowers.spark.daria.sql.SparkSessionExt._
import org.apache.spark.sql.types.{IntegerType, StringType}
describe("etl collection") {
it("can run etls that are organized in a map") {
val sourceDF = spark.createDF(
List(
("bob", 14),
("liz", 20)
), List(
("name", StringType, true),
("age", IntegerType, true)
)
)
val etlDefinition = new EtlDefinition(
name = "example",
sourceDF = sourceDF, // etl step 1
transform = someTransform(), // etl step 2
write = someWriter(), // etl step 3
hidden = false
)
val etls = scala.collection.mutable.Map[String, EtlDefinition]("example" -> etlDefinition)
etls += ("ex2" -> etlDefinition)
etls("example").process()
}
}另一种选择可能是从最可缩放的eventually。最后的一个小例子是:
Post("/something", body) ~> routes ~> check {
response should be a pendingResponse
eventually {
Post("/something", body) ~> routes ~> check {
response should be a expectedResponse
}
}
}而不是Post,你可以调用一些方法。
你可以在这里这里和鳞状
更新:
it ("your definition") {
eventually { Thread.sleep(50); step1() should be A // assertion }
eventually { Thread.sleep(50); step2() should be B }
eventually { Thread.sleep(50); step3() should be C }
}如果你愿意的话,你可以把它们嵌套起来
https://stackoverflow.com/questions/61066103
复制相似问题