首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >反序列化Avro消息

反序列化Avro消息
EN

Stack Overflow用户
提问于 2020-02-01 21:45:24
回答 2查看 5K关注 0票数 3

我从这里部署了卡夫卡。此外,我还向docker-compose.yml Postgres容器添加了如下内容:

代码语言:javascript
复制
postgres:
    image: postgres
    hostname: kafka-postgres
    container_name: kafka-postgres
    depends_on:
      - ksql-server
      - broker
      - schema-registry
      - connect
    ports:
      - 5432:5432

创建了一个主题页面视图。

此外,我创建了带有设置的DatagenConnector并运行它。

代码语言:javascript
复制
{
  "name": "datagen-pageviews",
  "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "kafka.topic": "pageviews",
  "max.interval": "100",
  "iterations": "999999999",
  "quickstart": "pageviews"
} 

据我所见,连接器为主题定义了模式:

代码语言:javascript
复制
{
  "type": "record",
  "name": "pageviews",
  "namespace": "ksql",
  "fields": [
    {
      "name": "viewtime",
      "type": "long"
    },
    {
      "name": "userid",
      "type": "string"
    },
    {
      "name": "pageid",
      "type": "string"
    }
  ],
  "connect.name": "ksql.pageviews"
} 

下一步是创建JdbcSinkConnector,将数据从Kafka主题传输到Postgres表。起作用了。连接器的设置:

代码语言:javascript
复制
{
  "name": "from-kafka-to-pg",
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "errors.tolerance": "all",
  "errors.log.enable": "true",
  "errors.log.include.messages": "true",
  "topics": [
    "pageviews"
  ],
  "connection.url": "jdbc:postgresql://kafka-postgres:5432/postgres",
  "connection.user": "postgres",
  "connection.password": "********",
  "auto.create": "true",
  "auto.evolve": "true"
}

然后我试着自己给这个话题发信息。但由于错误而失败:

2020-02-01 21:16:11,750在任务到-pg-0任务中遇到错误.使用类io.confluent.connect.avro.Avro转换器执行阶段'VALUE_CONVERTER‘,其中消耗的记录为{ partition=0 =’pageview‘、partition=0、offset=23834、timestamp=1580591160374、timestampType=CreateTime}。(org.apache.kafka.connect.runtime.errors.LogReporter) org.apache.kafka.connect.errors.DataException:未能将主题页面视图的数据反序列化为Avro: at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110) at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)的FutureTask.run(FutureTask.java:266),原因是: org.apache.kafka.common.errors.SerializationException:错误反序列化id-1的Avro消息,原因是: org.apache.kafka.common.errors.SerializationException:未知魔法字节!

所以发送方法很重要。我就是这样做的(Python,confluent-kafka-python):

代码语言:javascript
复制
producer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.poll(0)
producer.produce(topic, json.dumps({
   'viewtime': 123,
   'userid': 'user_1',
   'pageid': 'page_1'
}).encode('utf8'), on_delivery=kafka_delivery_report)
producer.flush()

也许我应该提供一个带有消息(AvroProducer)的模式?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-02-02 00:11:45

该主题需要一条Avro类型的消息。

来自AvroProducerconfluent-kafka-python做到了这一点:

代码语言:javascript
复制
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer


value_schema_str = """
{
   "namespace": "ksql",
   "name": "value",
   "type": "record",
   "fields" : [
     {
       "name" : "viewtime",
       "type" : "long"
     }, 
     {
       "name" : "userid",
       "type" : "string"
     }, 
     {
       "name" : "pageid",
       "type" : "string"
     }
   ]
}
"""

key_schema_str = """
{
   "namespace": "ksql",
   "name": "key",
   "type": "record",
   "fields" : [
     {
       "name" : "pageid",
       "type" : "string"
     }
   ]
}
"""

value_schema = avro.loads(value_schema_str)
key_schema = avro.loads(key_schema_str)
value = {"name": "Value"}
key = {"name": "Key"}


def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))


avroProducer = AvroProducer({
    'bootstrap.servers': 'mybroker,mybroker2',
    'on_delivery': delivery_report,
    'schema.registry.url': 'http://schema_registry_host:port'
    }, default_key_schema=key_schema, default_value_schema=value_schema)

avroProducer.produce(topic='my_topic', value=value, key=key)
avroProducer.flush()
票数 2
EN

Stack Overflow用户

发布于 2020-02-02 00:06:16

出现问题是因为尝试使用Avro转换器从非Avro的主题读取数据。

有两种可能的解决办法:

1.切换Kafka Connect的接收器连接器以使用正确的转换器

例如,如果您正在使用从Kafka主题到Kafka Connect接收器的JSON数据:

代码语言:javascript
复制
...
value.converter=org.apache.kafka.connect.json.JsonConverter. 
value.converter.schemas.enable=true/false
...

value.converter.schemas.enable取决于消息是否包含架构。

2.将上游格式改为Avro格式

要使DatagenConnector生成消息到Avro格式的Kafka,请设置value.convertervalue.converter.schema.registry.url参数:

代码语言:javascript
复制
...
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
...

有关详细信息,请参阅kafka-connect datagen 文档

伟大的文章卡夫卡连接转换器和序列化。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60021343

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档