首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafka JDBC源连接器:从列值创建主题

Kafka JDBC源连接器:从列值创建主题
EN

Stack Overflow用户
提问于 2019-12-07 04:51:24
回答 1查看 346关注 0票数 1

我有一个微服务,它使用OracleDB在EVENT_STORE表中发布系统更改。表EVENT_STORE包含一个带有事件类型名称的列TYPE

JDBC Source Kafka Connect有可能接受EVENT_STORE表的更改,并将它们与KAFKA-TOPIC中的列TYPE的值一起发布。

这是我的源kafka连接器配置:

代码语言:javascript
复制
{
  "name": "kafka-connector-source-ms-name",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:oracle:thin:@localhost:1521:xe",
    "connection.user": "squeme-name",
    "connection.password": "password",
    "topic.prefix": "",
    "table.whitelist": "EVENT_STORE",
    "mode": "timestamp+incrementing",
    "timestamp.column.name": "CREATE_AT",
    "incrementing.column.name": "ID",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "config.action.reload": "restart",
    "errors.retry.timeout": "0",
    "errors.retry.delay.max.ms": "60000",
    "errors.tolerance": "none",
    "errors.log.enable": "false",
    "errors.log.include.messages": "false",
    "connection.attempts": "3",
    "connection.backoff.ms": "10000",
    "numeric.precision.mapping": "false",
    "validate.non.null": "true",
    "quote.sql.identifiers": "ALWAYS",
    "table.types": "TABLE",
    "poll.interval.ms": "5000",
    "batch.max.rows": "100",
    "table.poll.interval.ms": "60000",
    "timestamp.delay.interval.ms": "0",
    "db.timezone": "UTC"
  }
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-12-07 08:18:03

您可以尝试使用ExtractTopic转换从字段中提取主题名称

将以下属性添加到JSON

代码语言:javascript
复制
transforms=ValueFieldExample
transforms.ValueFieldExample.type=io.confluent.connect.transforms.ExtractTopic$Value
transforms.ValueFieldExample.field=TYPE
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59220164

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档