首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在Spark structured streaming中使用来自Kafka的Avro事件

在Spark structured streaming中使用来自Kafka的Avro事件
EN

Stack Overflow用户
提问于 2019-07-19 17:30:29
回答 1查看 687关注 0票数 0

我设计了一个Nifi流,将以Avro格式序列化的JSON事件推送到Kafka topic中,然后尝试在Spark Structured streaming中消费它。

虽然Kafka part运行良好,但Spark Structured streaming无法读取Avro事件。它失败,并出现以下错误。

代码语言:javascript
复制
[Stage 0:>                                                          (0 + 1) / 1]2019-07-19 16:56:57 ERROR Utils:91 - Aborting task
org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -62
        at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
        at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
        at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
        at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
        at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)

Spark代码

代码语言:javascript
复制
import org.apache.spark.sql.types.{ StructField, StructType }
import org.apache.spark.sql.types.{ DecimalType, LongType, ByteType, StringType }
import org.apache.spark.sql.types.DataType._
import scala.collection.Seq
import org.apache.spark._
import spark.implicits._
import org.apache.spark.streaming._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.sql.avro._
import java.nio.file.{Files, Path, Paths}

val spark = SparkSession.builder.appName("Spark-Kafka-Integration").master("local").getOrCreate()
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("schema.avsc")))
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host:port").option("subscribe", "topic_name").load()
val df1 = df.select(from_avro(col("value"),jsonFormatSchema).as("data")).select("data.*")
df1.writeStream.format("console").option("truncate","false").start()
))

Spark中使用的模式

代码语言:javascript
复制
{
 "type": "record",
 "name": "kafka_demo_new",
 "fields": [
  {
   "name": "host",
   "type": "string"
  },
  {
   "name": "event",
   "type": "string"
  },
  {
   "name": "connectiontype",
   "type": "string"
  },
  {
   "name": "user",
   "type": "string"
  },
  {
   "name": "eventtimestamp",
   "type": "string"
  }
 ]
}

Kafka中的示例主题数据

代码语言:javascript
复制
{"host":"localhost","event":"Qradar_Demo","connectiontype":"tcp/ip","user":"user","eventtimestamp":"2018-05-24 23:15:07"}

以下是版本信息

代码语言:javascript
复制
HDP - 3.1.0
Kafka - 2.0.0
Spark - 2.4.0

任何帮助都是非常感谢的。

EN

回答 1

Stack Overflow用户

发布于 2019-07-19 19:21:38

有类似的问题,并发现Kafka / KSQL有一个不同版本的AVRO,这使得其他组件抱怨。

这也可能是你的情况:看看:https://github.com/confluentinc/ksql/issues/1742

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57109543

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档