首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >从Azure事件中心读取并使用Protobuf时使用org.apache.spark.sql.catalyst.encoders.init的NoSuchMethodError

从Azure事件中心读取并使用Protobuf时使用org.apache.spark.sql.catalyst.encoders.init的NoSuchMethodError
EN

Stack Overflow用户
提问于 2021-03-01 14:31:30
回答 1查看 230关注 0票数 0

正在尝试将spark作业提交到群集,群集将从Azure Event Hub读取二进制编码的流数据帧,然后使用Protobuf文件格式对其进行转换。

build.sbt

代码语言:javascript
复制
import sbtassembly.AssemblyPlugin.autoImport.ShadeRule

name := "AdQualitySpark"
version := "0.1"
scalaVersion := "2.12.12"

unmanagedJars in Compile += file("lib/geneva-java-0.1.0.jar")
unmanagedJars in Compile += file("lib/bond-7.0.0-preview-2017-11-22.jar")
unmanagedJars in Compile += file("lib/azure-cosmosdb-spark_2.4.0_2.11-3.6.7-uber.jar")
resolvers += "MMLSpark Repo" at "https://mmlspark.azureedge.net/maven"

libraryDependencies ++= Seq(
  "org.apache.commons" % "commons-lang3" % "3.1",
  "org.apache.spark" %% "spark-core" % "3.0.0" % "provided",
  "org.apache.spark" %% "spark-sql" % "3.0.0" % "provided",
  "org.apache.spark" %% "spark-streaming" % "3.0.0" % "provided",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.0.0",
  "org.apache.kafka" % "kafka-clients" % "0.8.2.1" % "provided",
  "org.apache.spark" %% "spark-mllib" % "3.0.0" % "provided",
//  "com.databricks" %% "spark-csv" % "1.5.0",
  "org.rogach" %% "scallop" % "3.1.5",
  "org.scalaj" %% "scalaj-http" % "2.4.1",
  "com.microsoft.azure" %% "azure-eventhubs-spark" % "2.3.15",
  "org.scalatest" %% "scalatest" % "3.0.0" % "provided",
  "com.microsoft.ml.spark" %% "mmlspark" % "1.0.0-rc3-27-b1c14008-SNAPSHOT",
  "com.holdenkarau" %% "spark-testing-base" % "3.0.0_1.0.0" % "provided",
  "com.typesafe" % "config" % "1.4.0",
  "com.thesamet.scalapb" %% "compilerplugin" % "0.9.4",
  "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion, //% "protobuf",
  "com.google.protobuf" % "protobuf-java-util" % "3.11.1",
  "com.thesamet.scalapb" %% "sparksql-scalapb" % "0.9.0",
  "com.thesamet.scalapb" %% "scalapb-json4s" % "0.9.3",
  "com.microsoft.azure" % "azure-data-lake-store-sdk" % "2.3.8",
  "com.databricks" %% "dbutils-api" % "0.0.4",
  "io.delta" %% "delta-core" % "0.7.0",
  "com.microsoft.sqlserver" % "mssql-jdbc" % "8.2.1.jre8",
  "org.apache.spark" %% "spark-avro" % "3.0.0"
  // "com.databricks" %% "spark-avro" % "3.2.0"
)

// Needed for CosmosDB Spark connector.
dependencyOverrides ++= {
  Seq(
    "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.11.0",
    "com.fasterxml.jackson.core" % "jackson-databind" % "2.11.0",
    "com.fasterxml.jackson.core" % "jackson-core" % "2.11.0",
    "com.google.guava" % "guava" % "15.0",
    "org.json4s" %% "json4s-jackson" % "3.6.10"
  )
}


/*
lazy val excludeJpountz = ExclusionRule(organization = "net.jpountz.lz4", name = "lz4")
lazy val kafkaClients = "org.apache.kafka" % "kafka-clients" % "0.8.2.1" excludeAll(excludeJpountz) // add more exclusions here*/

assemblyShadeRules in assembly := Seq(
    ShadeRule.rename("com.google.protobuf.**" -> "shadeproto.@1").inAll
)

scalacOptions += "-Xmacro-settings:materialize-derivations"
javaOptions in assembly += "-Xmx2g"

assemblyExcludedJars in assembly := {
  val cp = (fullClasspath in assembly).value
  cp filter { f =>
    !(f.data.getName.contains("mml") || f.data.getName.contains("http") || f.data.getName.contains("proton")
      || f.data.getName.contains("spray") || f.data.getName.contains("scallop") || f.data.getName.contains("compat")
      || f.data.getName.contains("eventhub") || f.data.getName.contains("kafka") || f.data.getName.contains("scalapb")
      || f.data.getName.contains("compilerplugin") || f.data.getName.contains("lenses") || f.data.getName.contains("protoc")
      || f.data.getName.contains("frameless") || f.data.getName.contains("shadeproto")
      || f.data.getName.contains("geneva") || f.data.getName.contains("mssql-jdbc")|| f.data.getName().contains("spark-avro")
      || f.data.getName.contains("bond")|| f.data.getName.contains("cosmosdb") || f.data.getName.contains("delta"))
  }
  /*
    cp foreach  {f => println(f.data.getName,f.data.getAbsoluteFile().length())}
    cp*/
}

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case x => MergeStrategy.first
}

PB.targets in Compile := Seq(
  scalapb.gen() -> (sourceManaged in Compile).value
)
/*
(scalastyleConfig in Test) := baseDirectory.value / "scalastyleconfig.xml"
logBuffered in Test := false


lazy val testScalastyle = taskKey[Unit]("testScalastyle")
testScalastyle := scalastyle.in(Test).toTask("").value
(test in Test) := ((test in Test) dependsOn testScalastyle).value */

lazy val compileScalastyle = taskKey[Unit]("compileScalastyle")
compileScalastyle := scalastyle.in(Compile).toTask("").value
(compile in Compile) := ((compile in Compile) dependsOn compileScalastyle).value

parallelExecution in Test := false

 assemblyMergeStrategy in assembly := {
    case PathList("META-INF", xs @ _*) => MergeStrategy.discard
    case x => MergeStrategy.first
 }
 assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
    //todo:start including tests in build
 logBuffered in Test := false

抛出异常的源代码为:

代码语言:javascript
复制
def editorialAdStream(sc: SparkContext, spark: SparkSession): Dataset[AdEntity] = {

val ehConnectionString = ConnectionStringBuilder(ConnectionString)
  .setEventHubName(EventHubName)
  .build

val customEventhubParameters =
  EventHubsConf(ehConnectionString)
    .setConsumerGroup(ConsumerGroup)
    .setEndingPosition(EventPosition.fromEndOfStream)
    .setMaxEventsPerTrigger(MaxEventsPerTrigger)
    .setPrefetchCount(PrefetchCount)

val binaryAdStream = spark
  .readStream
  .format("org.apache.spark.sql.eventhubs.EventHubsSourceProvider")
  .options(customEventhubParameters.toMap)
  .load()
  .selectExpr("body")
  .as(Encoders.BINARY)

val adStream: Dataset\[EditorialAdEntity\] =
  binaryAdStream
    .map(m => EditorialAdEntity().mergeFrom(CodedInputStream.newInstance(m)))

异常堆栈跟踪为:

代码语言:javascript
复制
java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.<init>(Lorg/apache/spark/sql/types/StructType;ZLscala/collection/Seq;Lorg/apache/spark/sql/catalyst/expressions/Expression;Lscala/reflect/ClassTag;)V
    at frameless.TypedExpressionEncoder$.apply(TypedExpressionEncoder.scala:45)
    at scalapb.spark.Implicits.typedEncoderToEncoder(TypedEncoders.scala:125)
    at scalapb.spark.Implicits.typedEncoderToEncoder$(TypedEncoders.scala:122)
    at scalapb.spark.Implicits$.typedEncoderToEncoder(TypedEncoders.scala:128)
    at Utils.MessagingQueues.EventHubSourceReader$.editorialAdStream(EventHubSourceReader.scala:57)
    at Utils.MessagingQueues.SourceReader$.readEditorialAdSource(SourceReader.scala:39)
    at Workflows.Streaming.PROD.UnifiedNRT.UnifiedNRTHelper$.fetchInputStream(UnifiedNRTHelper.scala:62)
    at Workflows.Streaming.PROD.UnifiedNRT.UnifiedNRT$.main(UnifiedNRT.scala:50)

事实:1该项目使用的是Spark 3.0。2 Scala版本是2.12.12 3根据官方文档,我尝试将scalaPB库的版本更改为不同的版本,但没有帮助:https://scalapb.github.io/docs/sparksql/#setting-up-your-project

请帮助解决此异常。

EN

回答 1

Stack Overflow用户

发布于 2021-06-02 18:14:24

此问题是由于使用的scalapb库和scala的版本不匹配造成的。如果您遇到此问题,请尝试使用以下配置:

代码语言:javascript
复制
scalaVersion := "2.12.10"

/* Common variables */
val sparkVersion = "3.0.1"
libraryDependencies += "com.thesamet.scalapb" %% "sparksql-scalapb" % "0.11.0-RC1",

libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.10.10"
addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.0")

在build.sbt中添加以下行

代码语言:javascript
复制
assemblyShadeRules in assembly ++= Seq(
  ShadeRule.rename("com.google.protobuf.**" -> "shadeproto.@1").inAll,
  ShadeRule.rename("scala.collection.compat.**" -> "shadecompat.@1").inAll,
)

重新加载项目结构并构建项目。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66417587

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档