我在JDBC连接器中有一个问题,但在S3连接器中也看到了这个问题。我试图了解如何确保我的连接器实际上正在消耗某个主题中的所有数据。我预计,由于冲水的大小,可能会有一定的延迟(10/15分钟)的信息消费,但我注意到,我最终有很大的延迟(天.)我的消费者总是有一些东西在抵消上的滞后。
例如,我读/看了这篇文章/视频(主要是评论):“16岁的https://rmoff.net/2020/12/08/twelve-days-of-smt-day-1-insertfield-timestamp/ https://github.com/confluentinc/demo-scene/blob/master/kafka-connect-single-message-transforms/day1.adoc flush.size太低了,但如果太高了,你就得等你的文件出现在S3上,我就无聊了。”
它确实提到,如果flush.size大于可用的记录,那么可能需要时间来消耗这些记录,但我从来没有想过这会超过几分钟。如何确保所有记录都已使用,并且我真的希望避免使用flush.size =1
也许这只是我对接收器连接器的错误理解,但我确实希望它们作为正常的使用者工作,所以我希望它们使用所有的数据,并且这种类型的刷新/批处理大小将更多地基于超时和性能问题。
如果有人感兴趣,这是我的连接器配置。
对于S3接收器:
topics.regex: com.custom.obj_(.*)
storage.class: io.confluent.connect.s3.storage.S3Storage
s3.region: ${@S3_REGION@}
s3.bucket.name: ${@S3_BUCKET@}
topics.dir: ${@S3_OBJ_TOPICS_DIR@}
flush.size: 200
rotate.interval.ms: 20000
auto.register.schemas: false
s3.part.size: 5242880
parquet.codec: snappy
offset.flush.interval.ms: 20000
offset.flush.timeout.ms: 5000
aws.access.key.id: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:accesskey}
aws.secret.access.key: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:secretkey}
format.class: com.custom.connect.s3.format.parquet.ParquetFormat
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: com.custom.insight.connect.protobuf.ProtobufConverter
partitioner.class: io.confluent.connect.storage.partitioner.DailyPartitioner
timestamp.extractor: Record
locale: ${@S3_LOCALE@}
timezone: ${@S3_TIMEZONE@}
store.url: ${@S3_STORAGE_URL@}
connect.meta.data: false
transforms: kafkaMetaData,formatTs
transforms.kafkaMetaData.type: org.apache.kafka.connect.transforms.InsertField$Value
transforms.kafkaMetaData.offset.field: kafka_offset
transforms.kafkaMetaData.partition.field: kafka_partition
transforms.kafkaMetaData.timestamp.field: kafka_timestamp
transforms.formatTs.format: yyyy-MM-dd HH:mm:ss:SSS
transforms.formatTs.field: message_ts
transforms.formatTs.target.type: string
transforms.formatTs.type: org.apache.kafka.connect.transforms.TimestampConverter$Value
errors.tolerance: all
errors.deadletterqueue.topic.name: ${@DLQ_STORAGE_TOPIC@}
errors.deadletterqueue.context.headers.enable: true对于JDBC接收器:
topics.regex: com.custom.obj_(.*)
table.name.format: ${@PREFIX@}${topic}
batch.size: 200
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: com.custom.insight.connect.protobuf.ProtobufConverter
connection.url: ${@DB_URL@}
connection.user: ${@DB_USER@}
connection.password: ${@DB_PASSWORD@}
auto.create: true
auto.evolve: true
db.timezone: ${@DB_TIMEZONE@}
quote.sql.identifiers: never
transforms: kafkaMetaData
transforms.kafkaMetaData.offset.field: kafka_offset
transforms.kafkaMetaData.partition.field: kafka_partition
transforms.kafkaMetaData.timestamp.field: kafka_timestamp
transforms.kafkaMetaData.type: org.apache.kafka.connect.transforms.InsertField$Value
errors.tolerance: all
errors.deadletterqueue.topic.name: ${@DLQ_STORAGE_TOPIC@}
errors.deadletterqueue.context.headers.enable: true我已经读过这两篇文章了,但仍然不太确定:Kafka JDBC Sink连接器,批量插入值 https://github.com/confluentinc/kafka-connect-jdbc/issues/290
另外,例如,我看到了人们使用的例子(我认为这无助于我的用例),但我想知道每个连接器是否定义了这个值?我甚至有点困惑于这样一个事实:在文档中,我总是在没有使用者的情况下找到配置。但是我经常在消费者身上找到的例子。所以我想这意味着这是一个既适用于消费者又适用于生产者的通用属性?
consumer.max.interval.ms: 300000
consumer.max.poll.records: 200有人有好的反馈吗?
发布于 2021-01-25 10:29:26
关于提供的Kafka S3接收器连接器配置:
topics.regex: com.custom.obj_(.*)
storage.class: io.confluent.connect.s3.storage.S3Storage
s3.region: ${@S3_REGION@}
s3.bucket.name: ${@S3_BUCKET@}
topics.dir: ${@S3_OBJ_TOPICS_DIR@}
flush.size: 200
rotate.interval.ms: 20000
auto.register.schemas: false
s3.part.size: 5242880
parquet.codec: snappy
offset.flush.interval.ms: 20000
offset.flush.timeout.ms: 5000
aws.access.key.id: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:accesskey}
aws.secret.access.key: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:secretkey}
format.class: com.custom.connect.s3.format.parquet.ParquetFormat
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: com.custom.insight.connect.protobuf.ProtobufConverter
partitioner.class: io.confluent.connect.storage.partitioner.DailyPartitioner
timestamp.extractor: Record
locale: ${@S3_LOCALE@}
timezone: ${@S3_TIMEZONE@}
store.url: ${@S3_STORAGE_URL@}
connect.meta.data: false
transforms: kafkaMetaData,formatTs
transforms.kafkaMetaData.type: org.apache.kafka.connect.transforms.InsertField$Value
transforms.kafkaMetaData.offset.field: kafka_offset
transforms.kafkaMetaData.partition.field: kafka_partition
transforms.kafkaMetaData.timestamp.field: kafka_timestamp
transforms.formatTs.format: yyyy-MM-dd HH:mm:ss:SSS
transforms.formatTs.field: message_ts
transforms.formatTs.target.type: string
transforms.formatTs.type:org.apache.kafka.connect.transforms.TimestampConverter$Value
errors.tolerance: all
errors.deadletterqueue.topic.name: ${@DLQ_STORAGE_TOPIC@}
errors.deadletterqueue.context.headers.enable: true有些配置字段可以调整以控制消耗\上载到S3速率。因此,减少卡夫卡的滞后性,抵消你所看到的。它的最佳做法是为配置中的以下字段使用变量。
根据个人经验,你可以做的调整是:
flush.size: 800这是(如你所说):
最大记录数:连接器的flush.size配置属性指定应该写入单个S3对象的最大记录数。此设置没有默认设置。
我更喜欢更大的文件,并使用下面的时间调整来控制消费。确保您的记录不太大或太小,无法作为flush.size * RECORD_SIZE的结果生成rational文件。
rotate.interval.ms: (i would delete this field, see rotate.schedule explanation below)即:
最大记录时间跨度:连接器的rotate.interval.ms指定文件可以保持打开并为其他记录做好准备的最大时间范围(以毫秒为单位)。
rotate.schedule.interval.ms 60000即:
计划旋转:连接器的rotate.schedule.interval.ms指定文件可以保持打开的最大时间间隔(以毫秒为单位),并为其他记录做好准备。与rotate.interval.ms不同的是,与预定的旋转不同,每个文件的时间戳从写入第一条记录到文件的系统时间开始。只要在rotate.schedule.interval.ms指定的时间范围内处理记录,该记录就会被写入文件。一旦记录在当前文件的timespan之后被处理,文件就会被刷新,上传到S3,并提交文件中记录的偏移量。使用以当前系统时间开始的timespan创建一个新文件,并将记录写入该文件。提交将在预定时间执行,而不考虑以前提交的时间或消息的数量。当您必须根据当前服务器时间提交数据时,例如在每小时开始时,此配置非常有用。默认值-1表示此功能被禁用。
您使用默认的-1,这意味着禁用此旋转。这一调整将发挥最大的作用,因为每个任务都将消耗更多的资源。
关于问题的第二部分:
您可以通过将度量添加到您的kafka并使用例如prometheus和grafana进行连接来获得可观察性。下面的配置指南在源中。

资料来源:
https://stackoverflow.com/questions/65663750
复制相似问题