首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >从Chill 0.6.0 (Kryo 2.21)迁移到0.9.5 (Kryo 4.0.2)并反序列化旧消息

从Chill 0.6.0 (Kryo 2.21)迁移到0.9.5 (Kryo 4.0.2)并反序列化旧消息
EN

Stack Overflow用户
提问于 2021-01-20 16:37:35
回答 1查看 64关注 0票数 0

我们使用Chill-bijection来序列化/反序列化Kafka与Kryo之间的消息。我们的应用程序的旧版本使用的是依赖于com.esotericsoftware.kryo.kryo-2.21.jar的Chill 0.6.0,而我们的应用程序的新版本使用的是依赖于com.esotericsoftware.kryo-shaded-4.0.2.jar的Chill 0.9.5。

为了最大限度地减少停机时间,我们的应用程序的新版本需要能够读取由旧版本的应用程序编写的消息,但它会失败,并返回一个错误:

代码语言:javascript
复制
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition prod_2x02_external_entity_updates-0 at offset 8764198. If needed, please seek past the record to continue consumption.
Caused by: com.twitter.bijection.InversionFailure: Failed to invert: [B@14122f45
    at com.twitter.bijection.InversionFailure$$anonfun$partialFailure$1.applyOrElse(InversionFailure.scala:43)
    at com.twitter.bijection.InversionFailure$$anonfun$partialFailure$1.applyOrElse(InversionFailure.scala:42)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
    at scala.util.Failure.recoverWith(Try.scala:236)
    at com.twitter.bijection.Inversion$.attempt(Inversion.scala:32)
    at com.X.backend.serialization.CustomKafkaKryoDeserializer.deserialize(KafkaKryoSerialization.scala:38)
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1310)
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128)
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1541)
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1377)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:677)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:632)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1315)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
    at com.X.common.kafka.KafkaSubscriber$$anonfun$brokerFound$1.applyOrElse(KafkaSubscriber.scala:163)
    at akka.actor.Actor.aroundReceive(Actor.scala:535)
    at akka.actor.Actor.aroundReceive$(Actor.scala:533)
    at com.X.common.kafka.KafkaSubscriber.aroundReceive(KafkaSubscriber.scala:29)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
    at akka.actor.ActorCell.invoke(ActorCell.scala:547)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: com.X.backend.DashboardExternalEntities$ExtMessage
Serialization trace:
entity (com.X.backend.QueueMessageProtocol$ExternalEntityUpdated)
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:160)
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:133)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:118)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
    at com.twitter.chill.SerDeState.readClassAndObject(SerDeState.java:61)
    at com.twitter.chill.KryoPool.fromBytes(KryoPool.java:94)
    at com.X.backend.serialization.CustomKafkaKryoDeserializer.$anonfun$deserialize$1(KafkaKryoSerialization.scala:38)
    at com.twitter.bijection.Inversion$.$anonfun$attempt$1(Inversion.scala:32)
    at scala.util.Try$.apply(Try.scala:213)
    ... 25 common frames omitted
Caused by: java.lang.ClassNotFoundException: com.X.backend.DashboardExternalEntities$ExtMessage
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:154)
    ... 35 common frames omitted

基于此:https://github.com/EsotericSoftware/kryo/releases/tag/kryo-parent-4.0.0我实现了一个自定义的ScalaKryoInstantiator和相应的类来添加setOptimizedGenerics(true)

代码语言:javascript
复制
class CustomKafkaKryoDeserializer[M <: AnyRef] extends KafkaDeserializer[M] {
  override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()

  override def close(): Unit = ()

  override def deserialize(topic: String, data: Array[Byte]): M = {
    Inversion.attempt(data) {
      CustomKryoInstantiator.defaultPool.fromBytes(_)
    }.get.asInstanceOf[M]
  }
}

object CustomKryoInstantiator extends ScalaKryoInstantiator {
  private val mutex = new AnyRef with Serializable // some serializable object
  @transient private var kpool: Option[KryoPool] = None

  def defaultPool: KryoPool = mutex.synchronized {
    if (kpool.isEmpty) {
      kpool = Some(KryoPool.withByteArrayOutputStream(guessThreads, new CustomKryoInstantiator))
    }
    kpool.get
  }

  private def guessThreads: Int = {
    val cores = Runtime.getRuntime.availableProcessors
    val GUESS_THREADS_PER_CORE = 4
    GUESS_THREADS_PER_CORE * cores
  }
}

class CustomKryoInstantiator extends EmptyScalaKryoInstantiator {
  override def newKryo: KryoBase = {
    val k = super.newKryo
    k.getFieldSerializerConfig.setOptimizedGenerics(true)
    val reg = new AllScalaRegistrar
    reg(k)
    k
  }
}

但我仍然得到相同的错误。有没有办法用Kryo 4.0.2读取由Kryo 2.21序列化的消息?消息类本身并未更改。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-01-22 20:37:31

结果,消息包被重命名,因此Kryo无法找到正确的类。

尽管如此,即使包重命名被恢复,Kryo 4.0.2和3.0.3也无法反序列化使用Kryo 2.21序列化的消息。

总之,我们决定用Protobuf替换Kryo,并编写一个MirrorMakerMessageHandler来将Kafka消息从Chill-bijection 0.6.0 (Kryo 2.21)转换为Protobuf。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65805837

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档