我试图在编写火花流单元测试时模拟输入dstream。我能够模拟RDD,但当我试图将它们转换为dstream时,dstream对象将变为空对象。我用了以下代码-
val lines = mutable.Queue[RDD[String]]()
val dstream = streamingContext.queueStream(lines)
// append data to DStream
lines += sparkContext.makeRDD(Seq("To be or not to be.", "That is the question."))任何与此相关的帮助都将受到高度赞赏。
发布于 2022-09-30 18:41:17
为所有DataFrameWriter,DataFrameReader,DataStreamReader,DataStreamWriter编写UT
使用上述步骤的示例测试用例
基于Maven的依赖关系
<groupId>org.scalatestplus</groupId>
<artifactId>mockito-3-4_2.11</artifactId>
<version>3.2.3.0</version>
<scope>test</scope>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>2.13.0</version>
<scope>test</scope>让我们使用一个星火类的例子,其中源是Hive,接收器是JDBC
class DummySource extends SparkPipeline {
/**
* Method to read the source and create a Dataframe
*
* @param sparkSession : SparkSession
* @return : DataFrame
*/
override def read(spark: SparkSession): DataFrame = {
spark.read.table("Table_Name").filter("_2 > 1")
}
/**
* Method to transform the dataframe
*
* @param df : DataFrame
* @return : DataFrame
*/
override def transform(df: DataFrame): DataFrame = ???
/**
* Method to write/save the Dataframe to a target
*
* @param df : DataFrame
*
*/
override def write(df: DataFrame): Unit =
df.write.jdbc("url", "targetTableName", new Properties())
}嘲弄读
test("Spark read table") {
val dummySource = new DummySource()
val sparkSession = SparkSession
.builder()
.master("local[*]")
.appName("mocking spark test")
.getOrCreate()
val testData = Seq(("one", 1), ("two", 2))
val df = sparkSession.createDataFrame(testData)
df.show()
val mockDataFrameReader = mock[DataFrameReader]
val mockSpark = mock[SparkSession]
when(mockSpark.read).thenReturn(mockDataFrameReader)
when(mockDataFrameReader.table("Table_Name")).thenReturn(df)
dummySource.read(mockSpark).count() should be(1)
}嘲讽写
test("Spark write") {
val dummySource = new DummySource()
val mockDf = mock[DataFrame]
val mockDataFrameWriter = mock[DataFrameWriter[Row]]
when(mockDf.write).thenReturn(mockDataFrameWriter)
when(mockDataFrameWriter.mode(SaveMode.Append)).thenReturn(mockDataFrameWriter)
doNothing().when(mockDataFrameWriter).jdbc("url", "targetTableName", new Properties())
dummySource.write(df = mockDf)
}

参考文件中的流代码
https://stackoverflow.com/questions/31049731
复制相似问题