首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Cloudflow无法从kafka读取avro消息

Cloudflow无法从kafka读取avro消息
EN

Stack Overflow用户
提问于 2021-01-08 13:55:17
回答 1查看 60关注 0票数 0

我正在使用lightbend cloudflow开发我的应用程序,该应用程序使用外部kafka主题。

外部kafka主题包含avro记录,如果我尝试将kafka-avro-console-consumer与schema-regestry一起使用,则可以获取消息。

但在同样的情况下,cloudflow无法反序列化消息并抛出异常。

代码语言:javascript
复制
7:34:07.595 [consumer-akka.actor.default-dispatcher-4] ERROR cloudflow.streamlets.CodecInlet$ - Data decoding error, skipping message

com.twitter.bijection.InversionFailure: Failed to invert: [B@503e211a

    at com.twitter.bijection.InversionFailure$$anonfun$partialFailure$1.applyOrElse(InversionFailure.scala:43)

    at com.twitter.bijection.InversionFailure$$anonfun$partialFailure$1.applyOrElse(InversionFailure.scala:42)

    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)

    at scala.util.Failure.recoverWith(Try.scala:236)

    at com.twitter.bijection.Inversion$.attempt(Inversion.scala:32)

    at com.twitter.bijection.avro.BinaryAvroCodec.invert(AvroCodecs.scala:330)

    at com.twitter.bijection.avro.BinaryAvroCodec.invert(AvroCodecs.scala:320)

    at cloudflow.streamlets.avro.AvroSerde.$anonfun$inverted$1(AvroCodec.scala:39)

    at cloudflow.streamlets.avro.AvroSerde.decode(AvroCodec.scala:44)

    at cloudflow.streamlets.avro.AvroCodec.decode(AvroCodec.scala:34)

    at cloudflow.akkastream.AkkaStreamletContextImpl.$anonfun$sourceWithContext$2(AkkaStreamletContextImpl.scala:141)

    at akka.stream.scaladsl.FlowWithContextOps.$anonfun$map$1(FlowWithContextOps.scala:70)

    at akka.stream.impl.fusing.Map$$anon$1.onPush(Ops.scala:53)

    at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:541)

    at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:423)

    at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:625)

    at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:502)

    at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:600)

    at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:769)

    at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:784)

    at akka.actor.Actor.aroundReceive(Actor.scala:537)

    at akka.actor.Actor.aroundReceive$(Actor.scala:535)

    at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:691)

    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)

    at akka.actor.ActorCell.invoke(ActorCell.scala:547)

    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)

    at akka.dispatch.Mailbox.run(Mailbox.scala:231)

    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)

    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)

    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)

    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)

    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)

Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -25

    at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)

    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)

    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)

    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)

    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)

    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)

    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)

    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)

    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)

    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)

    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)

    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)

    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)

    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)

    at com.twitter.bijection.avro.BinaryAvroCodec.$anonfun$invert$3(AvroCodecs.scala:332)

    at com.twitter.bijection.Inversion$.$anonfun$attempt$1(Inversion.scala:32)

    at scala.util.Try$.apply(Try.scala:213)

    ... 28 common frames omitted

17:34:07.595 [consumer-akka.actor.default-dispatcher-4] ERROR cloudflow.streamlets.CodecInlet$ - Data decoding error, skipping message

com.twitter.bijection.InversionFailure: Failed to invert: [B@5c9c541f

    at com.twitter.bijection.InversionFailure$$anonfun$partialFailure$1.applyOrElse(InversionFailure.scala:43)

    at com.twitter.bijection.InversionFailure$$anonfun$partialFailure$1.applyOrElse(InversionFailure.scala:42)

    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)

    at scala.util.Failure.recoverWith(Try.scala:236)

    at com.twitter.bijection.Inversion$.attempt(Inversion.scala:32)

    at com.twitter.bijection.avro.BinaryAvroCodec.invert(AvroCodecs.scala:330)

    at com.twitter.bijection.avro.BinaryAvroCodec.invert(AvroCodecs.scala:320)

    at cloudflow.streamlets.avro.AvroSerde.$anonfun$inverted$1(AvroCodec.scala:39)

    at cloudflow.streamlets.avro.AvroSerde.decode(AvroCodec.scala:44)

    at cloudflow.streamlets.avro.AvroCodec.decode(AvroCodec.scala:34)

    at cloudflow.akkastream.AkkaStreamletContextImpl.$anonfun$sourceWithContext$2(AkkaStreamletContextImpl.scala:141)

    at akka.stream.scaladsl.FlowWithContextOps.$anonfun$map$1(FlowWithContextOps.scala:70)

    at akka.stream.impl.fusing.Map$$anon$1.onPush(Ops.scala:53)

    at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:541)

    at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:423)

    at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:625)

    at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:502)

    at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:600)

    at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:769)

    at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:784)

    at akka.actor.Actor.aroundReceive(Actor.scala:537)

    at akka.actor.Actor.aroundReceive$(Actor.scala:535)

    at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:691)

    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)

    at akka.actor.ActorCell.invoke(ActorCell.scala:547)

    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)

    at akka.dispatch.Mailbox.run(Mailbox.scala:231)

    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)

    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)

    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)

    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)

    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)

Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -25

    at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)

    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)

    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)

    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)

    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)

    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)

    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)

    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)

    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)

    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)

    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)

    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)

    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)

    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)

    at com.twitter.bijection.avro.BinaryAvroCodec.$anonfun$invert$3(AvroCodecs.scala:332)

    at com.twitter.bijection.Inversion$.$anonfun$attempt$1(Inversion.scala:32)

    at scala.util.Try$.apply(Try.scala:213)

    ... 28 common frames omitted

17:34:07.596 [consumer-akka.actor.default-dispatcher-4] ERROR cloudflow.streamlets.CodecInlet$ - Data decoding error, skipping message

com.twitter.bijection.InversionFailure: Failed to invert: [B@522b512f

    at com.twitter.bijection.InversionFailure$$anonfun$partialFailure$1.applyOrElse(InversionFailure.scala:43)

    at com.twitter.bijection.InversionFailure$$anonfun$partialFailure$1.applyOrElse(InversionFailure.scala:42)

    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)

    at scala.util.Failure.recoverWith(Try.scala:236)

    at com.twitter.bijection.Inversion$.attempt(Inversion.scala:32)

    at com.twitter.bijection.avro.BinaryAvroCodec.invert(AvroCodecs.scala:330)

    at com.twitter.bijection.avro.BinaryAvroCodec.invert(AvroCodecs.scala:320)

    at cloudflow.streamlets.avro.AvroSerde.$anonfun$inverted$1(AvroCodec.scala:39)

    at cloudflow.streamlets.avro.AvroSerde.decode(AvroCodec.scala:44)

    at cloudflow.streamlets.avro.AvroCodec.decode(AvroCodec.scala:34)

    at cloudflow.akkastream.AkkaStreamletContextImpl.$anonfun$sourceWithContext$2(AkkaStreamletContextImpl.scala:141)

    at akka.stream.scaladsl.FlowWithContextOps.$anonfun$map$1(FlowWithContextOps.scala:70)

    at akka.stream.impl.fusing.Map$$anon$1.onPush(Ops.scala:53)

    at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:541)

    at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:423)

    at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:625)

    at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:502)

    at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:600)

    at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:769)

    at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:784)

    at akka.actor.Actor.aroundReceive(Actor.scala:537)

    at akka.actor.Actor.aroundReceive$(Actor.scala:535)

    at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:691)

    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)

    at akka.actor.ActorCell.invoke(ActorCell.scala:547)

    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)

    at akka.dispatch.Mailbox.run(Mailbox.scala:231)

    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)

    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)

    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)

    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)

    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)

Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -25

    at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)

    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)

    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)

    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)

    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)

    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)

    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)

    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)

    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)

    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)

    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)

    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)

    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)

    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)

    at com.twitter.bijection.avro.BinaryAvroCodec.$anonfun$invert$3(AvroCodecs.scala:332)

    at com.twitter.bijection.Inversion$.$anonfun$attempt$1(Inversion.scala:32)

    at scala.util.Try$.apply(Try.scala:213)

    ... 28 common frames omitted
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-01-09 01:21:30

com.twitter.bijection.avro.BinaryAvroCodec不适用于Confluent Schema Registry格式。

您需要调整Kafka客户端的反序列化程序设置,以使用Confluent中适当的KafkaAvroDeserializer

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65624078

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档