首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >SparkException:无法在类上序列化的任务: org.apache.avro.generic.GenericDatumReader

SparkException:无法在类上序列化的任务: org.apache.avro.generic.GenericDatumReader
EN

Stack Overflow用户
提问于 2020-06-10 09:33:27
回答 1查看 405关注 0票数 1

我有两个字段的json格式的输入(大小: BigInteger和数据: String)。这里的数据包含ZStd压缩的阿夫罗记录。任务是解码这些记录。为此,我使用了火花-阿夫罗。但是,任务不是可序列化的异常。

样本数据

代码语言:javascript
复制
{
"data": "7z776qOPevPJF5/0Dv9Rzx/1/i8gJJiQD5MTDGdbeNKKT"
"size" : 231
}

代码语言:javascript
复制
import java.util.Base64
import com.github.luben.zstd.Zstd
import org.apache.avro.Schema
import com.twitter.bijection.Injection
import org.apache.avro.generic.GenericRecord
import com.twitter.bijection.avro.GenericAvroCodecs
import com.databricks.spark.avro.SchemaConverters
import org.apache.spark.sql.types.StructType
import com.databricks.spark.avro.SchemaConverters._

def decode2(input:String,size:Int,avroBijection:Injection[GenericRecord, Array[Byte]], sqlType:StructType): GenericRecord = {

        val compressedGenericRecordBytes = Base64.getDecoder.decode(input)
        val genericRecordBytes = Zstd.decompress(compressedGenericRecordBytes,size)
        avroBijection.invert(genericRecordBytes).get
}

val myRdd = spark.read.format("json").load("/path").rdd

val rows = myRdd.mapPartitions{
    lazy val schema = new Schema.Parser().parse(schemaStr)
    lazy val avroBijection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)    
    lazy val sqlType = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]
    (iterator) => {
        val myList = iterator.toList
        myList.map{ x => {
            val size = x(1).asInstanceOf[Long].intValue
            val data = x(0).asInstanceOf [String]
            decode2(data, size, avroBijection,sqlType)
        }
    }.iterator
    }
}

异常

代码语言:javascript
复制
files: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[987] at rdd at <console>:346
org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:794)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:793)
  ... 112 elided
Caused by: java.io.NotSerializableException: org.apache.avro.generic.GenericDatumReader
Serialization stack:
    - object not serializable (class: org.apache.avro.generic.GenericDatumReader, value: org.apache.avro.generic.GenericDatumReader@4937cd88)
    - field (class: com.twitter.bijection.avro.BinaryAvroCodec, name: reader, type: interface org.apache.avro.io.DatumReader)
    - object (class com.twitter.bijection.avro.BinaryAvroCodec, com.twitter.bijection.avro.BinaryAvroCodec@6945439c)
    - field (class: $$$$79b2515edf74bd80cfc9d8ac1ba563c6$$$$iw, name: avroBijection, type: interface com.twitter.bijection.Injection)

已经尝试过了,所以发布了

  1. 火花: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema

在这篇文章之后,我更新了decode2方法,将schemaStr作为输入,并在方法中转换为模式和SqlType。例外情况不变

  1. 使用模式将带有Spark的AVRO消息转换为DataFrame

使用post中提供的代码创建object Injection,然后使用它。这个也没帮上忙。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-06-10 10:38:01

你试过吗

代码语言:javascript
复制
val rows = myRdd.mapPartitions{
    (iterator) => {
        val myList = iterator.toList
        myList.map{ x => {
    lazy val schema = new Schema.Parser().parse(schemaStr)
    lazy val avroBijection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)    
    lazy val sqlType = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]
            val size = x(1).asInstanceOf[Long].intValue
            val data = x(0).asInstanceOf [String]
            decode2(data, size, avroBijection,sqlType)
        }
    }.iterator
    }
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62300311

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档