我试图重置卡夫卡偏移,但不能重置它。
当前偏移量为6,并试图将其重置为最早的偏移量,同时也尝试将其减少1,但它没有重置并继续反映6。
kafka-consumer-groups --bootstrap-server <server>:9092 --group EDWOFFSETGROUP_24 --describe
Consumer group 'EDWOFFSETGROUP_24' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
EDW_TOTALS_JSON1 0 6 6 0 - - -Approch#1
kafka-consumer-groups --bootstrap-server <server>:9092 --group EDWOFFSETGROUP_24 --topic EDW_TOTALS_JSON1:0 --reset-offsets --shift-by -1
WARN: No action will be performed as the --execute option is missing.In a future major release, the default behavior of this command will be to prompt the user before executing the reset rather than doing a dry run. You should add the --dry-run option explicitly if you are scripting this command and want to keep the current default behavior without prompting.
[2019-12-04 14:43:14,364] WARN New offset (5) is lower than earliest offset for topic partition EDW_TOTALS_JSON1-0. Value will be set to 6 (kafka.admin.ConsumerGroupCommand$)
TOPIC PARTITION NEW-OFFSET
EDW_TOTALS_JSON1 0 6 Approch#2
kafka-consumer-groups --bootstrap-server <server>:9092 --group EDWOFFSETGROUP_24 --topic EDW_TOTALS_JSON1 --reset-offsets --to-earliest --execute
TOPIC PARTITION NEW-OFFSET
EDW_TOTALS_JSON1 0 6 发布于 2019-12-04 23:36:22
数据似乎是通过保留政策而被清除的。如果消息被截断,它将从出现在主题分区细节上的新位置开始。
请参阅保留设定定义
您可以尝试使用新的使用者来验证
bin/kafka-console-consumer.sh --zookeeper <zk_host>:2181 --topic test --from-beginning如果您正在使用新的API,那么请使用下面的
bin/kafka-console-consumer.sh --bootstrap-server <server>:9092 --topic EDW_TOTALS_JSON1 --from-beginnin还可以使用GetOffsetShell检查分区的偏移量详细信息。
bin/kafka-run-class.sh kafka.tools.GetOffsetShell
required argument [broker-list], [topic]
Option Description
------ -----------
--broker-list <hostname:port,..., REQUIRED: The list of hostname and hostname:port> port of the server to connect to.
--max-wait-ms <Integer: ms> The max amount of time each fetch request waits. (default: 1000)
--offsets <Integer: count> number of offsets returned (default: 1)
--partitions <partition ids> comma separated list of partition ids. If not specified, will find offsets for all partitions (default)
--time <Long: timestamp in milliseconds / -1(latest) / -2 (earliest) timestamp; offsets will come before this timestamp, as in getOffsetsBefore >
--topic <topic> REQUIRED: The topic to get offsets from.示例
kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic vital_signs --time -1https://stackoverflow.com/questions/59185831
复制相似问题