首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >kafka connect源连接器如何使用timestamp或timestamp+incrementing模式?

kafka connect源连接器如何使用timestamp或timestamp+incrementing模式?
EN

Stack Overflow用户
提问于 2019-05-10 17:58:55
回答 1查看 1.6K关注 0票数 4

我有一个数据库(Mariadb)关系,它的"modified“列是"bigint(10)”,它代表一个时间戳,我相信unix时间格式。当我尝试运行模式为"timestamp“或"timestamp+incrementing”的kafka源连接器时,没有事件被推送到主题中。如果我只运行递增,新的条目将被推送到主题。有人能提示我在哪里配置了错误的连接器吗?或者连接器不能识别unix时间格式的时间戳?

我尝试使用以下属性运行连接器(仅基于时间戳进行检索):

代码语言:javascript
复制
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name":"only_ts",
        "config": {
            "numeric.mapping": "best_fit",
            "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
            "connection.url": "jdbc:mysql://mariadb/moodle",
            "connection.user": "user",
            "connection.password": "",
            "topic.prefix": "only_ts_",
            "mode": "timestamp", 
            "timestamp.column.name":"modified", 
            "table.whitelist":"mdl_forum_posts",
            "poll.intervals.ms": 10000
        }
  }'

每当我创建或更新条目时,我都希望看到来自"mdl_forum_posts“的条目被推送到kafka主题"only_ts_mdl_forum_posts”中。但是,使用此连接器时,什么都不会发生。如果我只使用“递增”模式,这就可以很好地工作,并且与预期的一样。但是为了获得数据库更新,我需要添加模式时间戳。

"describe mdl_forum_posts“的输出

代码语言:javascript
复制
+---------------+--------------+------+-----+---------+----------------+

| Field         | Type         | Null | Key | Default | Extra          |

+---------------+--------------+------+-----+---------+----------------+

| id            | bigint(10)   | NO   | PRI | NULL    | auto_increment |

| discussion    | bigint(10)   | NO   | MUL | 0       |                |

| parent        | bigint(10)   | NO   | MUL | 0       |                |

| userid        | bigint(10)   | NO   | MUL | 0       |                |

| created       | bigint(10)   | NO   | MUL | 0       |                |

| modified      | bigint(10)   | NO   |     | 0       |                |

| mailed        | tinyint(2)   | NO   | MUL | 0       |                |

| subject       | varchar(255) | NO   |     |         |                |

| message       | longtext     | NO   |     | NULL    |                |

| messageformat | tinyint(2)   | NO   |     | 0       |                |

| messagetrust  | tinyint(2)   | NO   |     | 0       |                |

| attachment    | varchar(100) | NO   |     |         |                |

| totalscore    | smallint(4)  | NO   |     | 0       |                |

| mailnow       | bigint(10)   | NO   |     | 0       |                |

| deleted       | tinyint(1)   | NO   |     | 0       |                |

+---------------+--------------+------+-----+---------+----------------+

和"show create table moodle.mdl_forum_posts;“的输出:

代码语言:javascript
复制
| mdl_forum_posts | CREATE TABLE mdl_forum_posts (

  id bigint(10) NOT NULL AUTO_INCREMENT,

  discussion bigint(10) NOT NULL DEFAULT '0',

  parent bigint(10) NOT NULL DEFAULT '0',

  userid bigint(10) NOT NULL DEFAULT '0',

  created bigint(10) NOT NULL DEFAULT '0',

  modified bigint(10) NOT NULL DEFAULT '0',

  mailed tinyint(2) NOT NULL DEFAULT '0',

  subject varchar(255) NOT NULL DEFAULT '',

  message longtext NOT NULL,

  messageformat tinyint(2) NOT NULL DEFAULT '0',

  messagetrust tinyint(2) NOT NULL DEFAULT '0',

  attachment varchar(100) NOT NULL DEFAULT '',

  totalscore smallint(4) NOT NULL DEFAULT '0',

  mailnow bigint(10) NOT NULL DEFAULT '0',

  deleted tinyint(1) NOT NULL DEFAULT '0',

  PRIMARY KEY (id),

  KEY mdl_forupost_use_ix (userid),

  KEY mdl_forupost_cre_ix (created),

  KEY mdl_forupost_mai_ix (mailed),

  KEY mdl_forupost_dis_ix (discussion),

  KEY mdl_forupost_par_ix (parent)

) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COMMENT='All posts are stored in this table' |

"modified“列中的一个示例条目是:

代码语言:javascript
复制
select modified from mdl_forum_posts;
1557487199

这是unix时间中的时间戳,如下所示:

代码语言:javascript
复制
select from_unixtime(modified) from mdl_forum_posts;
2019-05-10 11:19:59

关于相关连接器的相关日志(只有时间戳)似乎显示了一些查询?

代码语言:javascript
复制
kafka-connect_1    | [2019-05-10 11:48:47,434] DEBUG TimestampIncrementingTableQuerier{table="moodle"."mdl_forum_posts", query='null', topicPrefix='only_ts_', incrementingColumn='', timestampColumns=[modified]} prepared SQL query: SELECT * FROM `moodle`.`mdl_forum_posts` WHERE `moodle`.`mdl_forum_posts`.`modified` > ? AND `moodle`.`mdl_forum_posts`.`modified` < ? ORDER BY `moodle`.`mdl_forum_posts`.`modified` ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
kafka-connect_1    | [2019-05-10 11:48:47,435] DEBUG Resetting querier TimestampIncrementingTableQuerier{table="moodle"."mdl_forum_posts", query='null', topicPrefix='only_ts_', incrementingColumn='', timestampColumns=[modified]} (io.confluent.connect.jdbc.source.JdbcSourceTask)
EN

回答 1

Stack Overflow用户

发布于 2020-10-30 18:57:59

我也有同样的问题。我唯一的变通方法就是这里提到的:https://github.com/confluentinc/kafka-connect-jdbc/issues/566。这意味着unix时间戳(bigint)列的timestamp模式可以与自定义查询一起使用。你只需要使用你自己的where clause。例如,在您的示例中,它可能是这样的:

代码语言:javascript
复制
SELECT id 
FROM mdl_forum_posts
WHERE to_timestamp(modified/1000) > ? AND to_timestamp(modified/1000) < ? ORDER BY modified ASC
--

to_timestamp是DB方言中的日期转换函数。请注意允许注释自动生成的where clause--

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56075201

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档