我使用JDBC连接器将数据从MySQL转移到Kafka。我感兴趣的数据来自一个选择连接的3个表,因此我用mode:incrementing和query配置了连接器
{
"name": "stats",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry.kafka-broker:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry.kafka-broker:8081",
"connection.url": "jdbc:mysql://DB_HOST:3306/SCHEMA?user=USER&password=PASSWORD&zeroDateTimeBehavior=CONVERT_TO_NULL&useSSL=false",
"mode": "incrementing",
"validate.non.null": "false",
"topic.prefix": "t",
"incrementing.column.name": "s.id",
"transforms": "createKey,extractString",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "uuid",
"transforms.extractString.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractString.field": "uuid",
"quote.sql.identifiers":"never",
"query": "select s.id, concat(database(), '_', s.id) as uuid, c.email, s.type as type, s.created_at as log_date, a.type as a_type from stats s join concact c on c.id = s.recipient_id join address a on a.id = s.address_id",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"batch.max.rows": "100",
"poll.interval.ms": "60000"
}
}在检查正在运行的连接器状态时:
curl http://conncet:8083/connectors/stats/status
{
"name": "stats",
"connector": {
"state": "RUNNING",
"worker_id": "connect-3:38083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "connect-1:18083"
}
],
"type": "source"
}但一个小时后,我仍然没有看到创建的主题。我在MySQL中签入了使用show full processlist;运行的查询,并看到了两个类似于以下的查询:
select s.id, concat(database(), '_', s.id) as uuid, c.email, s.type as type, s.created_at as log_date, a.type as a_type from stats s join concact c on c.id = s.recipient_id join address a on a.id = s.address_id WHERE s.id > -1 ORDER BY s.id ASC因此,这个查询基本上与我在query中提供的连接器配置加上WHERE s.id > -1 ORDER BY s.id ASC中提供的查询相同,因为这个连接中的查询产生了一个有2100万行的结果集-- MySQL很长时间都在发送数据。当我再次使用show full processlist;检查时,我现在看到了4个这样的查询,然后是8个,然后是16个,依此类推。
的问题是:
s.id > -1 ORDER BY s.id ASC。"batch.max.rows": "100"是否仅控制初始投票后的批处理大小??更新:
对于这个问题,有一个开放的主题。我认为这个问题可以结束。
发布于 2019-03-29 19:05:29
使用incrementing mode并传递query的,使用以下where子句执行该查询:WHERE incrementingColumnName > lastIncrementedValue ORDER BY incrementingColumnName ASC。(如果使用增量模式和查询,就不能在那里传递where子句)。
在第一次投票中,lastIncrementedValue是-1,所以它尝试查询所有记录。提取每个记录后,lastIncrementedValue会增加,因此下一次查询只会轮询新数据。batch.max.rows指的是SourceTask::poll(...)将返回多少条记录到Kafka框架。这是批次的最大大小,将立即发送给卡夫卡。
我认为,当您从单个表获取数据时,它工作得更快,因为查询执行得更快(不那么复杂)。如果使用其他SQL工具执行这些查询,则执行类似的操作。
发布于 2020-05-07 12:55:05
在5.5中添加了query.suffix。我使用它添加了一个限制语句,它工作得很好,它只是将限制附加到查询的末尾。
请参阅问题
https://stackoverflow.com/questions/55422416
复制相似问题