我在GCP Dataproc中运行了一个长期运行的Apache结构化流作业,它每10分钟从Kafka读取数据,并进行一些处理。Kafka主题有3个分区,保留期为3天。
我面临的问题是,几个小时后,程序停止阅读卡夫卡的数据。如果我删除gcp桶(这是检查点目录),然后重新启动流作业-它将再次开始消耗数据。
下面是代码,在这里我从Kafka读取数据&使用foreach批处理调用处理发生的函数
df_stream = spark.readStream.format('kafka') \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location) \
.option("kafka.ssl.keystore.password", ssl_keystore_password) \
.option("kafka.bootstrap.servers", kafkaBrokers) \
.option("subscribe", topic) \
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.option("kafka.metadata.max.age.ms", "1000") \
.option("kafka.ssl.keystore.type", "PKCS12") \
.option("kafka.ssl.truststore.type", "PKCS12") \
.option("maxOffsetsPerTrigger", 100000) \
.option("max.poll.records", 500) \
.option("max.poll.interval.ms", 1000000) \
.load()
query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", "topic").writeStream \
.outputMode("append") \
.trigger(processingTime='3 minutes') \
.option("truncate", "false") \
.option("checkpointLocation", checkpoint) \
.foreachBatch(convertToDictForEachBatch) \
.start()下面是日志的片段,其中
total time take, convertToDict : 0:00:00.006695
22/08/26 17:45:03 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Member consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1-26755c4c-93d6-4ab6-8799-411439e310bc sending LeaveGroup request to coordinator 35.185.24.226:9094 (id: 2147483646 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
22/08/26 17:50:00 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
22/08/26 17:50:00 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Lost previously assigned partitions syslog.ueba-us4.v1.versa.demo3-0, syslog.ueba-us4.v1.versa.demo3-1, syslog.ueba-us4.v1.versa.demo3-2
22/08/26 17:50:00 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] (Re-)joining group
22/08/26 17:50:00 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
22/08/26 17:50:00 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] (Re-)joining group
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Finished assignment for group at generation 1: {consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1-307570e0-ca20-42d5-b4d1-255da4fca485=Assignment(partitions=[syslog.ueba-us4.v1.versa.demo3-0, syslog.ueba-us4.v1.versa.demo3-1, syslog.ueba-us4.v1.versa.demo3-2])}
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Successfully joined group with generation 1
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Notifying assignor about the new Assignment(partitions=[syslog.ueba-us4.v1.versa.demo3-0, syslog.ueba-us4.v1.versa.demo3-1, syslog.ueba-us4.v1.versa.demo3-2])
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Adding newly assigned partitions: syslog.ueba-us4.v1.versa.demo3-0, syslog.ueba-us4.v1.versa.demo3-1, syslog.ueba-us4.v1.versa.demo3-2
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Found no committed offset for partition syslog.ueba-us4.v1.versa.demo3-0
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Found no committed offset for partition syslog.ueba-us4.v1.versa.demo3-1
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Found no committed offset for partition syslog.ueba-us4.v1.versa.demo3-2
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Resetting offset for partition syslog.ueba-us4.v1.versa.demo3-1 to offset 247969068.
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Resetting offset for partition syslog.ueba-us4.v1.versa.demo3-2 to offset 246383018.
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Resetting offset for partition syslog.ueba-us4.v1.versa.demo3-0 to offset 248913006.
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Seeking to LATEST offset of partition syslog.ueba-us4.v1.versa.demo3-0
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Seeking to LATEST offset of partition syslog.ueba-us4.v1.versa.demo3-1
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Seeking to LATEST offset of partition syslog.ueba-us4.v1.versa.demo3-2
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Resetting offset for partition syslog.ueba-us4.v1.versa.demo3-2 to offset 248534038.
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Resetting offset for partition syslog.ueba-us4.v1.versa.demo3-1 to offset 248185455.
22/08/26 17:50:03 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0-1, groupId=spark-kafka-source-a1532573-fad4-4127-ac20-ab9878913643-594190416-driver-0] Resetting offset for partition syslog.ueba-us4.v1.versa.demo3-0 to offset 248990456.
IN CONVERT TO DICT 84 currentTime 2022-08-26 17:50:03.995993 df -> DataFrame[value: string, timestamp: timestamp, topic: string]
before adding topic, count in batch 0
+-----+---------+-----+
|value|timestamp|topic|
+-----+---------+-----+
+-----+---------+-----+以下是卡夫卡主题中最古老和最新的偏移量(来自普罗米修斯)
Current Offset (metric : kafka_topic_partition_current_offset)
partition 0 : 249185343
partition 1 : 248380971
partition 2 : 248728475
Oldest Offset (metric : kafka_topic_partition_oldest_offset)
partition 0 : 248913006
partition 1 : 247969068
partition 2 : 248541752我在检查点桶里看到的是:
(base) Karans-MacBook-Pro:prometheus-yamls karanalang$ gsutil cat gs://ss-checkpoint-10m-noconsumergrp/offsets/96
v1
{"batchWatermarkMs":0,"batchTimestampMs":1661538780007,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"syslog.ueba-us4.v1.versa.demo3":{"2":246124174,"1":245765547,"0":246582707}}检查点桶中的偏移量低于主题中最老的偏移量,这是数据未被读取的原因吗?要解决这个问题需要做些什么?
蒂娅!
另外,这里是用来创建Strimzi主题的yaml,如下所示
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
annotations:
kubectl.kubernetes.io/last-applied-configuration: |
{"apiVersion":"kafka.strimzi.io/v1beta2","kind":"KafkaTopic","metadata":{"annotations":{},"labels":{"strimzi.io/cluster":"versa-kafka-gke"},"name":"syslog.ueba-us4.v1.versa.demo3","namespace":"kafka"},"spec":{"config":{"retention.ms":259200,"segment.bytes":1073741824},"partitions":3,"replicas":3}}
creationTimestamp: "2022-07-24T18:11:32Z"
generation: 3
labels:
strimzi.io/cluster: versa-kafka-gke
name: syslog.ueba-us4.v1.versa.demo3
namespace: kafka
resourceVersion: "140557580"
uid: 26c32f6f-5e4f-48ae-8026-007ab59624ec
spec:
config:
retention.ms: 259200
segment.bytes: 1073741824
partitions: 3
replicas: 3
status:
conditions:
- lastTransitionTime: "2022-08-21T17:25:44.833287Z"
status: "True"
type: Ready
observedGeneration: 3
topicName: syslog.ueba-us4.v1.versa.demo3segment.bytes = 1073741824 (即~1GB),这是否导致卡夫卡中的旧数据不可用?
发布于 2022-08-30 19:43:17
配置问题- retention.ms被设置为非常低的值(259200,而不是259200000),将其设置为毫秒,相当于3天,解决了这个问题。
https://stackoverflow.com/questions/73505391
复制相似问题