首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >将Kafka Connect JDBC源连接器输出转换为自定义格式

将Kafka Connect JDBC源连接器输出转换为自定义格式
EN

Stack Overflow用户
提问于 2021-02-18 20:31:12
回答 2查看 598关注 0票数 0

我已经实现了一个Kafka Connect JDBC Source连接器,它连接到Oracle-Database,并将数据写入Kafka-Topic。目前,我已经在设置value.converter.schemas.enable=false的情况下设置了选项value.converter=org.apache.kafka.connect.json.JsonConverter。该选项允许将JSON数据写入Kafka主题(顺便说一句,它工作得很好),但不包括在将数据发送到Kafka代理之前修改数据的选项。

我现在的问题是:有没有办法修改发送到Kafka主题的数据?在我的例子中,Source Connector运行一个自定义查询,并将其直接写入Kafka主题。无论如何,我想用一些定制的列和嵌套来扩展这个JSON。有没有办法做到这一点?

EN

回答 2

Stack Overflow用户

发布于 2021-02-18 21:44:27

  1. 请不要使用JsonConverter & schemas.enable=false :-)你在Oracle中的数据有这么好的模式,丢掉它太可惜了!说真的,使用Avro、Protobuf或JSON schema之类的东西可以在保留模式的同时,在Kafka主题中保持较小的消息大小。

有关这一重要概念的更多详细信息,请参阅文章like this one

  1. 单消息转换(SMT)可能是您正在寻找的将数据转换到Kafka的方法。例如,您可以insert fieldsflatten payloadslots more。如果没有现有的SMT可以执行您想要的操作,您可以使用Java API编写自己的SMT。

如果您想要执行更复杂的工作,如连接、聚合等,也可以使用Kafka Streams或ksqlDB在Kafka中对数据进行流处理。

票数 3
EN

Stack Overflow用户

发布于 2021-03-08 21:17:17

执行一些SMT的示例配置,如重命名字段、删除字段或添加字段。

进程:

DB表-connector推断字段的模式->输入连接字段(内部连接数据结构/ connectRecord(s)) -> SMT1 -> SMT2 -> ... -> last SMT -> JsonConverter ->输出json消息。

DB表:

代码语言:javascript
复制
current_name1 | current_name2 | FieldToDrop
bla1            bla2            bla3

推断的输入连接字段:

代码语言:javascript
复制
  "current_name1" = "bla1" // this is a connect record
  "current_name2" = "bla2" // this is a connect record
  "FieldToDrop" =  "bla3" // this is a connect record

value的输出json:

代码语言:javascript
复制
{
  "new_name1": "bla1",
  "new_name2": "bla2",
  "type": "MyCustomType"
}

连接器配置:

代码语言:javascript
复制
name=example-connector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
value.converter=org.apache.kafka.connect.json.JsonConverter 
... 

transforms=RenameFields,InsertFieldType,DropFields
    
    
transforms.RenameFields.type=org.apache.kafka.connect.transforms.ReplaceField$Value
    transforms.RenameFields.renames=current_name1:new_name1,current_name2:new_name2
    
    transforms.InsertFieldType.type=org.apache.kafka.connect.transforms.InsertField$Value
    transforms.InsertFieldType.static.field=type
    transforms.InsertFieldType.static.value=MyCustomType
    
    transforms.DropFields.type=org.apache.kafka.connect.transforms.ReplaceField$Value 
    transforms.DropFields.blacklist=FieldToDrop
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66260026

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档