首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >卡夫卡消费者抵消出口黄金-夏尔马或合流-卡夫卡-围棋自由

卡夫卡消费者抵消出口黄金-夏尔马或合流-卡夫卡-围棋自由
EN

Stack Overflow用户
提问于 2022-04-29 13:45:32
回答 2查看 705关注 0票数 1

我正试图找到一种对消费者群体执行抵消重置操作的方法,例如在Kafka命令中是这样的:

代码语言:javascript
复制
kafka-consumer-groups.sh --bootstrap-server $kfk --dry-run --reset-offsets --topic $t --group $cg1 --to-current --export | tee topic-offset.csv
代码语言:javascript
复制
kafka-consumer-groups.sh --bootstrap-server $kfk --dry-run --reset-offsets --topic $t --group $cg2 --to-current

然后根据导出文件导入新的偏移量?

代码语言:javascript
复制
kafka-consumer-groups.sh --bootstrap-server $kfk --execute --reset-offsets --topic $t --group $cg2 --from-file topic-offset.csv

从文件导出的导入不是问题.只是找不到办法得到然后设置偏移量..。

那么,是否有人使用sharmaconfluent-kafka-go库来玩这个呢?

(谢谢你事先提出的建议:)

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2022-05-24 04:28:09

下面是一些简单的up,它在YMAL配置的基础上进行偏移复位

https://github.com/nXnUs25/kfk-offsets

用于滞后监视使用者组列表和偏移重置的命令行工具。

我们有相同的抵消…现在,为了模拟这个过程,我们将生成主题消息,并继续使用其中一个消费者组propertest-cg1a11,我们将生成5条消息并在该消费者组中使用它们,这将给我们提供我们所消费的信息。

^CProcessed a total of 33 messages 28 +5

代码语言:javascript
复制
❯ ./kfkgo lag
Using config file: ~/kfk-offsets/kfk-offset.yaml
GROUP                                    TOPIC                                    PARTITION       CURRENT-OFFSET  LOG-END-OFFSET  LAG
propertest-cg1a11                        propertest                               0               183             183             0
propertest-cg1a11                        propertest                               1               165             165             0
propertest-cg1a11                        propertest                               2               192             192             0
propertest-cg1a11                        propertest                               3               177             177             0
propertest-cg1a11                        propertest                               4               192             192             0
propertest-cg1a11                        propertest                               5               169             169             0
propertest-cg1a11                        propertest                               6               180             180             0
propertest-cg1a11                        propertest                               7               164             164             0
propertest-cg1a11                        propertest                               8               195             195             0
propertest-cg1a11                        propertest                               9               188             188             0
propertest-cg1a11                        propertest                               10              184             184             0
propertest-cg1a11                        propertest                               11              184             184             0
TOTAL LAG:                                                                                                                        0

❯ ./kfkgo lag -g propertest-cg -t propertest
Using config file: ~/kfk-offsets/kfk-offset.yaml
GROUP                                    TOPIC                                    PARTITION       CURRENT-OFFSET  LOG-END-OFFSET  LAG
propertest-cg                            propertest                               0               179             183             4
propertest-cg                            propertest                               1               162             165             3
propertest-cg                            propertest                               2               190             192             2
propertest-cg                            propertest                               3               174             177             3
propertest-cg                            propertest                               4               187             192             5
propertest-cg                            propertest                               5               167             169             2
propertest-cg                            propertest                               6               177             180             3
propertest-cg                            propertest                               7               160             164             4
propertest-cg                            propertest                               8               192             195             3
propertest-cg                            propertest                               9               185             188             3
propertest-cg                            propertest                               10              183             184             1
propertest-cg                            propertest                               11              184             184             0
TOTAL LAG:                                                                                                                        33

现在,我们将偏移量从propertest-cg再次移回propertest-cg1a11,这将允许我们在CG上处理相同的消息。

代码语言:javascript
复制
❯ ./kfkgo offset -m
Using config file: ~/kfk-offsets/kfk-offset.yaml
moving

和再次核查:

卡夫卡命令:kafka-consumer-groups.sh

代码语言:javascript
复制
propertest-cg1a11 propertest      0          179             183             4               -               -               -
propertest-cg1a11 propertest      1          162             165             3               -               -               -
propertest-cg1a11 propertest      2          190             192             2               -               -               -
propertest-cg1a11 propertest      3          174             177             3               -               -               -
propertest-cg1a11 propertest      4          187             192             5               -               -               -
propertest-cg1a11 propertest      5          167             169             2               -               -               -
propertest-cg1a11 propertest      6          177             180             3               -               -               -
propertest-cg1a11 propertest      7          160             164             4               -               -               -
propertest-cg1a11 propertest      8          192             195             3               -               -               -
propertest-cg1a11 propertest      9          185             188             3               -               -               -
propertest-cg1a11 propertest      10         183             184             1               -               -               -
propertest-cg1a11 propertest      11         184             184             0               -               -               -


Consumer group 'propertest-cg' has no active members.
propertest-cg   propertest      0          179             183             4               -               -               -
propertest-cg   propertest      1          162             165             3               -               -               -
propertest-cg   propertest      2          190             192             2               -               -               -
propertest-cg   propertest      3          174             177             3               -               -               -
propertest-cg   propertest      4          187             192             5               -               -               -
propertest-cg   propertest      5          167             169             2               -               -               -
propertest-cg   propertest      6          177             180             3               -               -               -
propertest-cg   propertest      7          160             164             4               -               -               -
propertest-cg   propertest      8          192             195             3               -               -               -
propertest-cg   propertest      9          185             188             3               -               -               -
propertest-cg   propertest      10         183             184             1               -               -               -
propertest-cg   propertest      11         184             184             0               -               -               -

❯ ./kfkgo lag -g propertest-cg -t propertest
Using config file: ~/kfk-offsets/kfk-offset.yaml
GROUP                                    TOPIC                                    PARTITION       CURRENT-OFFSET  LOG-END-OFFSET  LAG
propertest-cg                            propertest                               0               179             183             4
propertest-cg                            propertest                               1               162             165             3
propertest-cg                            propertest                               2               190             192             2
propertest-cg                            propertest                               3               174             177             3
propertest-cg                            propertest                               4               187             192             5
propertest-cg                            propertest                               5               167             169             2
propertest-cg                            propertest                               6               177             180             3
propertest-cg                            propertest                               7               160             164             4
propertest-cg                            propertest                               8               192             195             3
propertest-cg                            propertest                               9               185             188             3
propertest-cg                            propertest                               10              183             184             1
propertest-cg                            propertest                               11              184             184             0
TOTAL LAG:                                                                                                                        33

❯ ./kfkgo lag
Using config file: ~/kfk-offsets/kfk-offset.yaml
GROUP                                    TOPIC                                    PARTITION       CURRENT-OFFSET  LOG-END-OFFSET  LAG
propertest-cg1a11                        propertest                               0               179             183             4
propertest-cg1a11                        propertest                               1               162             165             3
propertest-cg1a11                        propertest                               2               190             192             2
propertest-cg1a11                        propertest                               3               174             177             3
propertest-cg1a11                        propertest                               4               187             192             5
propertest-cg1a11                        propertest                               5               167             169             2
propertest-cg1a11                        propertest                               6               177             180             3
propertest-cg1a11                        propertest                               7               160             164             4
propertest-cg1a11                        propertest                               8               192             195             3
propertest-cg1a11                        propertest                               9               185             188             3
propertest-cg1a11                        propertest                               10              183             184             1
propertest-cg1a11                        propertest                               11              184             184             0
TOTAL LAG:                                                                                                                        33

自述中的更多示例

票数 0
EN

Stack Overflow用户

发布于 2022-04-29 19:51:27

好的,我想我发现它只是需要实现完整的解决方案,但是我应该对

代码语言:javascript
复制
func main() {
    brokers := []string{BK}
    kfk.Logger = log.New(os.Stdout, "", log.LstdFlags)

    cfg := kfk.NewConfig()
    cfg.ClientID = CID
    client, _ := kfk.NewClient(brokers, cfg)
    //fmt.Println(client)
    offsetMg, _ := kfk.NewOffsetManagerFromClient(CG, client)
    defer offsetMg.Close()

    consumer, _ := kfk.NewConsumerFromClient(client)
    defer consumer.Close()

    partitions, _ := consumer.Partitions(TOPIC)

    for _, p := range partitions {

        pom, _ := offsetMg.ManagePartition(TOPIC, p)
        ofs, pomStr := pom.NextOffset()
        fmt.Printf("Partition: %v -> nextOffset: %v:%s\n", p, ofs, pomStr)

    }
    fmt.Println("--")

}

这给了我这个输出:

代码语言:javascript
复制
Partition: 0 -> nextOffset: 31:
Partition: 1 -> nextOffset: 30:
Partition: 2 -> nextOffset: 45:
Partition: 3 -> nextOffset: 39:
Partition: 4 -> nextOffset: 45:
Partition: 5 -> nextOffset: 39:
Partition: 6 -> nextOffset: 37:
Partition: 7 -> nextOffset: 42:
Partition: 8 -> nextOffset: 43:
Partition: 9 -> nextOffset: 35:
Partition: 10 -> nextOffset: 41:
Partition: 11 -> nextOffset: 36:

它与from java命令完全相同:

代码语言:javascript
复制
❯ kafka-consumer-groups.sh --bootstrap-server $kfk --dry-run --reset-offsets --topic $t --group $cg1 --to-current | sort -k3 -n

GROUP                          TOPIC                          PARTITION  NEW-OFFSET
propertest-cg1                 propertest                     0          31
propertest-cg1                 propertest                     1          30
propertest-cg1                 propertest                     2          45
propertest-cg1                 propertest                     3          39
propertest-cg1                 propertest                     4          45
propertest-cg1                 propertest                     5          39
propertest-cg1                 propertest                     6          37
propertest-cg1                 propertest                     7          42
propertest-cg1                 propertest                     8          43
propertest-cg1                 propertest                     9          35
propertest-cg1                 propertest                     10         41
propertest-cg1                 propertest                     11         36

所以现在只剩下把这些数据导出到一个文件并使用函数。

代码语言:javascript
复制
// ResetOffset resets to the provided offset, alongside a metadata string that
    // represents the state of the partition consumer at that point in time. Reset
    // acts as a counterpart to MarkOffset, the difference being that it allows to
    // reset an offset to an earlier or smaller value, where MarkOffset only
    // allows incrementing the offset. cf MarkOffset for more details.
    ResetOffset(topic string, partition int32, offset int64, metadata string)

设置新的偏移..。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72058964

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档