首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >模拟输入dstream apache火花

模拟输入dstream apache火花
EN

Stack Overflow用户
提问于 2015-06-25 11:57:36
回答 1查看 867关注 0票数 4

我试图在编写火花流单元测试时模拟输入dstream。我能够模拟RDD,但当我试图将它们转换为dstream时,dstream对象将变为空对象。我用了以下代码-

代码语言:javascript
复制
val lines = mutable.Queue[RDD[String]]()
val dstream = streamingContext.queueStream(lines)

// append data to DStream
lines += sparkContext.makeRDD(Seq("To be or not to be.", "That is the question."))

任何与此相关的帮助都将受到高度赞赏。

EN

回答 1

Stack Overflow用户

发布于 2022-09-30 18:41:17

为所有DataFrameWriter,DataFrameReader,DataStreamReader,DataStreamWriter编写UT

使用上述步骤的示例测试用例

  1. 模拟
  2. 行为
  3. 断言

基于Maven的依赖关系

代码语言:javascript
复制
<groupId>org.scalatestplus</groupId>
<artifactId>mockito-3-4_2.11</artifactId>
<version>3.2.3.0</version>
<scope>test</scope>


<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>2.13.0</version>
<scope>test</scope>

让我们使用一个星火类的例子,其中源是Hive,接收器是JDBC

代码语言:javascript
复制
class DummySource extends SparkPipeline {
  /**
   * Method to read the source and create a Dataframe
   *
   * @param sparkSession : SparkSession
   * @return : DataFrame
   */
  override def read(spark: SparkSession): DataFrame = {
    spark.read.table("Table_Name").filter("_2 > 1")
  }

  /**
   * Method to transform the dataframe
   *
   * @param df : DataFrame
   * @return : DataFrame
   */
  override def transform(df: DataFrame): DataFrame = ???

  /**
   * Method to write/save the Dataframe to a target
   *
   * @param df : DataFrame
   *
   */
  override def write(df: DataFrame): Unit =
    df.write.jdbc("url", "targetTableName", new Properties())
}

嘲弄读

代码语言:javascript
复制
test("Spark read table") {
  val dummySource = new DummySource()
  val sparkSession = SparkSession
    .builder()
    .master("local[*]")
    .appName("mocking spark test")
    .getOrCreate()
  val testData = Seq(("one", 1), ("two", 2))
  val df = sparkSession.createDataFrame(testData)
  df.show()
  val mockDataFrameReader = mock[DataFrameReader]
  val mockSpark = mock[SparkSession]
  when(mockSpark.read).thenReturn(mockDataFrameReader)
  when(mockDataFrameReader.table("Table_Name")).thenReturn(df)
  dummySource.read(mockSpark).count() should be(1)
}

嘲讽写

代码语言:javascript
复制
  test("Spark write") {
  val dummySource = new DummySource()
  val mockDf = mock[DataFrame]
  val mockDataFrameWriter = mock[DataFrameWriter[Row]]
  when(mockDf.write).thenReturn(mockDataFrameWriter)
  when(mockDataFrameWriter.mode(SaveMode.Append)).thenReturn(mockDataFrameWriter)
  doNothing().when(mockDataFrameWriter).jdbc("url", "targetTableName", new Properties())
  dummySource.write(df = mockDf)
}

参考文件中的流代码

参考文献:https://medium.com/walmartglobaltech/spark-mocking-read-readstream-write-and-writestream-b6fe70761242

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/31049731

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档