首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Strimzi +结构化流补偿不可用(这是因为segment.bytes)吗?

Strimzi +结构化流补偿不可用(这是因为segment.bytes)吗?
EN

Stack Overflow用户
提问于 2022-08-26 18:52:58
回答 1查看 129关注 0票数 0

我在GCP Dataproc中运行了一个长期运行的Apache结构化流作业,它每10分钟从Kafka读取数据,并进行一些处理。Kafka主题有3个分区,保留期为3天。

我面临的问题是,几个小时后,程序停止阅读卡夫卡的数据。如果我删除gcp桶(这是检查点目录),然后重新启动流作业-它将再次开始消耗数据。

下面是代码,在这里我从Kafka读取数据&使用foreach批处理调用处理发生的函数

代码语言:javascript
复制
df_stream = spark.readStream.format('kafka') \
        .option("kafka.security.protocol", "SSL") \
        .option("kafka.ssl.truststore.location", ssl_truststore_location) \
        .option("kafka.ssl.truststore.password", ssl_truststore_password) \
        .option("kafka.ssl.keystore.location", ssl_keystore_location) \
        .option("kafka.ssl.keystore.password", ssl_keystore_password) \
        .option("kafka.bootstrap.servers", kafkaBrokers) \
        .option("subscribe", topic) \
        .option("startingOffsets", "latest") \
        .option("failOnDataLoss", "false") \
        .option("kafka.metadata.max.age.ms", "1000") \
        .option("kafka.ssl.keystore.type", "PKCS12") \
        .option("kafka.ssl.truststore.type", "PKCS12") \
        .option("maxOffsetsPerTrigger", 100000) \
        .option("max.poll.records", 500) \
        .option("max.poll.interval.ms", 1000000) \
        .load()



query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", "topic").writeStream \
        .outputMode("append") \
        .trigger(processingTime='3 minutes') \
        .option("truncate", "false") \
        .option("checkpointLocation", checkpoint) \
        .foreachBatch(convertToDictForEachBatch) \
        .start()

下面是日志的片段,其中

代码语言:javascript
复制
 total time take, convertToDict :  0:00:00.006695
22/08/26 17:45:03 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Member consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1-26755c4c-93d6-4ab6-8799-411439e310bc sending LeaveGroup request to coordinator 35.185.24.226:9094 (id: 2147483646 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
22/08/26 17:50:00 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
22/08/26 17:50:00 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Lost previously assigned partitions syslog.ueba-us4.v1.versa.demo3-0, syslog.ueba-us4.v1.versa.demo3-1, syslog.ueba-us4.v1.versa.demo3-2
22/08/26 17:50:00 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] (Re-)joining group
22/08/26 17:50:00 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
22/08/26 17:50:00 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] (Re-)joining group
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Finished assignment for group at generation 1: {consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1-307570e0-ca20-42d5-b4d1-255da4fca485=Assignment(partitions=[syslog.ueba-us4.v1.versa.demo3-0, syslog.ueba-us4.v1.versa.demo3-1, syslog.ueba-us4.v1.versa.demo3-2])}
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Successfully joined group with generation 1
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Notifying assignor about the new Assignment(partitions=[syslog.ueba-us4.v1.versa.demo3-0, syslog.ueba-us4.v1.versa.demo3-1, syslog.ueba-us4.v1.versa.demo3-2])
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Adding newly assigned partitions: syslog.ueba-us4.v1.versa.demo3-0, syslog.ueba-us4.v1.versa.demo3-1, syslog.ueba-us4.v1.versa.demo3-2
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Found no committed offset for partition syslog.ueba-us4.v1.versa.demo3-0
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Found no committed offset for partition syslog.ueba-us4.v1.versa.demo3-1
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Found no committed offset for partition syslog.ueba-us4.v1.versa.demo3-2
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Resetting offset for partition syslog.ueba-us4.v1.versa.demo3-1 to offset 247969068.
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Resetting offset for partition syslog.ueba-us4.v1.versa.demo3-2 to offset 246383018.
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Resetting offset for partition syslog.ueba-us4.v1.versa.demo3-0 to offset 248913006.
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Seeking to LATEST offset of partition syslog.ueba-us4.v1.versa.demo3-0
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Seeking to LATEST offset of partition syslog.ueba-us4.v1.versa.demo3-1
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Seeking to LATEST offset of partition syslog.ueba-us4.v1.versa.demo3-2
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Resetting offset for partition syslog.ueba-us4.v1.versa.demo3-2 to offset 248534038.
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Resetting offset for partition syslog.ueba-us4.v1.versa.demo3-1 to offset 248185455.
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Resetting offset for partition syslog.ueba-us4.v1.versa.demo3-0 to offset 248990456.
 IN CONVERT TO DICT  84  currentTime  2022-08-26 17:50:03.995993  df ->  DataFrame[value: string, timestamp: timestamp, topic: string]
 before adding topic, count in batch  0
+-----+---------+-----+
|value|timestamp|topic|
+-----+---------+-----+
+-----+---------+-----+

以下是卡夫卡主题中最古老和最新的偏移量(来自普罗米修斯)

代码语言:javascript
复制
Current Offset (metric : kafka_topic_partition_current_offset)
partition 0 : 249185343
partition 1 : 248380971
partition 2 : 248728475

Oldest Offset (metric : kafka_topic_partition_oldest_offset)
partition 0 : 248913006
partition 1 : 247969068
partition 2 : 248541752

我在检查点桶里看到的是:

代码语言:javascript
复制
(base) Karans-MacBook-Pro:prometheus-yamls karanalang$ gsutil cat gs://ss-checkpoint-10m-noconsumergrp/offsets/96 
v1
{"batchWatermarkMs":0,"batchTimestampMs":1661538780007,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"syslog.ueba-us4.v1.versa.demo3":{"2":246124174,"1":245765547,"0":246582707}}

检查点桶中的偏移量低于主题中最老的偏移量,这是数据未被读取的原因吗?要解决这个问题需要做些什么?

蒂娅!

另外,这里是用来创建Strimzi主题的yaml,如下所示

代码语言:javascript
复制
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  annotations:
    kubectl.kubernetes.io/last-applied-configuration: |
      {"apiVersion":"kafka.strimzi.io/v1beta2","kind":"KafkaTopic","metadata":{"annotations":{},"labels":{"strimzi.io/cluster":"versa-kafka-gke"},"name":"syslog.ueba-us4.v1.versa.demo3","namespace":"kafka"},"spec":{"config":{"retention.ms":259200,"segment.bytes":1073741824},"partitions":3,"replicas":3}}
  creationTimestamp: "2022-07-24T18:11:32Z"
  generation: 3
  labels:
    strimzi.io/cluster: versa-kafka-gke
  name: syslog.ueba-us4.v1.versa.demo3
  namespace: kafka
  resourceVersion: "140557580"
  uid: 26c32f6f-5e4f-48ae-8026-007ab59624ec
spec:
  config:
    retention.ms: 259200
    segment.bytes: 1073741824
  partitions: 3
  replicas: 3
status:
  conditions:
  - lastTransitionTime: "2022-08-21T17:25:44.833287Z"
    status: "True"
    type: Ready
  observedGeneration: 3
  topicName: syslog.ueba-us4.v1.versa.demo3

segment.bytes = 1073741824 (即~1GB),这是否导致卡夫卡中的旧数据不可用?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-08-30 19:43:17

配置问题- retention.ms被设置为非常低的值(259200,而不是259200000),将其设置为毫秒,相当于3天,解决了这个问题。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73505391

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档