首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafka连接JDBC连接器查询+增量模式在初始轮询时使用大型数据集

Kafka连接JDBC连接器查询+增量模式在初始轮询时使用大型数据集
EN

Stack Overflow用户
提问于 2019-03-29 17:11:37
回答 2查看 3.4K关注 0票数 3

我使用JDBC连接器将数据从MySQL转移到Kafka。我感兴趣的数据来自一个选择连接的3个表,因此我用mode:incrementingquery配置了连接器

代码语言:javascript
复制
{
    "name": "stats",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://schema-registry.kafka-broker:8081",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://schema-registry.kafka-broker:8081",
        "connection.url": "jdbc:mysql://DB_HOST:3306/SCHEMA?user=USER&password=PASSWORD&zeroDateTimeBehavior=CONVERT_TO_NULL&useSSL=false",
        "mode": "incrementing",
        "validate.non.null": "false",
        "topic.prefix": "t",
        "incrementing.column.name": "s.id",
        "transforms": "createKey,extractString",
        "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.createKey.fields": "uuid",
        "transforms.extractString.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.extractString.field": "uuid",
        "quote.sql.identifiers":"never",
        "query": "select s.id, concat(database(), '_', s.id) as uuid, c.email, s.type as type, s.created_at as log_date, a.type as a_type from stats s join concact c on c.id = s.recipient_id join address a on a.id = s.address_id",
        "errors.tolerance": "all",
        "errors.log.enable": "true",
        "errors.log.include.messages": "true",
        "batch.max.rows": "100",
        "poll.interval.ms": "60000"
    }
}

在检查正在运行的连接器状态时:

代码语言:javascript
复制
curl http://conncet:8083/connectors/stats/status

{
    "name": "stats",
    "connector": {
        "state": "RUNNING",
        "worker_id": "connect-3:38083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "connect-1:18083"
        }
    ],
    "type": "source"
}

但一个小时后,我仍然没有看到创建的主题。我在MySQL中签入了使用show full processlist;运行的查询,并看到了两个类似于以下的查询:

代码语言:javascript
复制
select s.id, concat(database(), '_', s.id) as uuid, c.email, s.type as type, s.created_at as log_date, a.type as a_type from stats s join concact c on c.id = s.recipient_id join address a on a.id = s.address_id WHERE s.id > -1 ORDER BY s.id ASC

因此,这个查询基本上与我在query中提供的连接器配置加上WHERE s.id > -1 ORDER BY s.id ASC中提供的查询相同,因为这个连接中的查询产生了一个有2100万行的结果集-- MySQL很长时间都在发送数据。当我再次使用show full processlist;检查时,我现在看到了4个这样的查询,然后是8个,然后是16个,依此类推。

的问题是:

  1. 为什么卡夫卡连接试图让所有的行同时添加:s.id > -1 ORDER BY s.id ASC
  2. 是否可以将连接器配置为不执行此操作,而只获取较小的数量?
  3. "batch.max.rows": "100"是否仅控制初始投票后的批处理大小??

更新:

对于这个问题,有一个开放的主题。我认为这个问题可以结束。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-03-29 19:05:29

使用incrementing mode并传递query的,使用以下where子句执行该查询:WHERE incrementingColumnName > lastIncrementedValue ORDER BY incrementingColumnName ASC。(如果使用增量模式和查询,就不能在那里传递where子句)。

在第一次投票中,lastIncrementedValue是-1,所以它尝试查询所有记录。提取每个记录后,lastIncrementedValue会增加,因此下一次查询只会轮询新数据。batch.max.rows指的是SourceTask::poll(...)将返回多少条记录到Kafka框架。这是批次的最大大小,将立即发送给卡夫卡。

我认为,当您从单个表获取数据时,它工作得更快,因为查询执行得更快(不那么复杂)。如果使用其他SQL工具执行这些查询,则执行类似的操作。

票数 1
EN

Stack Overflow用户

发布于 2020-05-07 12:55:05

在5.5中添加了query.suffix。我使用它添加了一个限制语句,它工作得很好,它只是将限制附加到查询的末尾。

请参阅问题

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55422416

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档