我已经实现了一个Kafka Connect JDBC Source连接器,它连接到Oracle-Database,并将数据写入Kafka-Topic。目前,我已经在设置value.converter.schemas.enable=false的情况下设置了选项value.converter=org.apache.kafka.connect.json.JsonConverter。该选项允许将JSON数据写入Kafka主题(顺便说一句,它工作得很好),但不包括在将数据发送到Kafka代理之前修改数据的选项。
我现在的问题是:有没有办法修改发送到Kafka主题的数据?在我的例子中,Source Connector运行一个自定义查询,并将其直接写入Kafka主题。无论如何,我想用一些定制的列和嵌套来扩展这个JSON。有没有办法做到这一点?
发布于 2021-02-18 21:44:27
JsonConverter & schemas.enable=false :-)你在Oracle中的数据有这么好的模式,丢掉它太可惜了!说真的,使用Avro、Protobuf或JSON schema之类的东西可以在保留模式的同时,在Kafka主题中保持较小的消息大小。有关这一重要概念的更多详细信息,请参阅文章like this one。
如果您想要执行更复杂的工作,如连接、聚合等,也可以使用Kafka Streams或ksqlDB在Kafka中对数据进行流处理。
发布于 2021-03-08 21:17:17
执行一些SMT的示例配置,如重命名字段、删除字段或添加字段。
进程:
DB表-connector推断字段的模式->输入连接字段(内部连接数据结构/ connectRecord(s)) -> SMT1 -> SMT2 -> ... -> last SMT -> JsonConverter ->输出json消息。
DB表:
current_name1 | current_name2 | FieldToDrop
bla1 bla2 bla3推断的输入连接字段:
"current_name1" = "bla1" // this is a connect record
"current_name2" = "bla2" // this is a connect record
"FieldToDrop" = "bla3" // this is a connect recordvalue的输出json:
{
"new_name1": "bla1",
"new_name2": "bla2",
"type": "MyCustomType"
}连接器配置:
name=example-connector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
value.converter=org.apache.kafka.connect.json.JsonConverter
...
transforms=RenameFields,InsertFieldType,DropFields
transforms.RenameFields.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.RenameFields.renames=current_name1:new_name1,current_name2:new_name2
transforms.InsertFieldType.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertFieldType.static.field=type
transforms.InsertFieldType.static.value=MyCustomType
transforms.DropFields.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.DropFields.blacklist=FieldToDrophttps://stackoverflow.com/questions/66260026
复制相似问题