首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用kafkacat向Kafka发送未经缓冲的消息

使用kafkacat向Kafka发送未经缓冲的消息
EN

Stack Overflow用户
提问于 2022-03-18 21:35:16
回答 1查看 854关注 0票数 1

我有单节点Kafka实例,通过docker-组合在本地运行。

(系统: Mac/Arm64 64,图像: wurstmeister/kafka:2.13-2.6.0)

我想使用kafkacat (通过家用kcat安装)立即使用生成和消费卡夫卡之间的消息。

下面是一个最小的脚本:

代码语言:javascript
复制
#!/usr/bin/env bash

NUM_MESSAGES=${1:-3}  # use arg1 or use default=3
KCAT_ARGS="-q -u -c $NUM_MESSAGES -b localhost:9092 -t unbuffered"

log() { echo "$*" 1>&2; }

producer() {
    log "starting producer"
    for i in `seq 1 3`; do
        echo "msg $i"
        log "produced: msg $i"
        sleep 1
    done | kcat $KCAT_ARGS -P
}

consumer() {
    log "starting consumer"
    kcat $KCAT_ARGS -C -o end | while read line; do
        log "consumed: $line"
    done
}

producer&
consumer&
wait

我希望(大致)得到以下输出:

代码语言:javascript
复制
starting producer
starting consumer
produced: msg 1
consumed: msg 1
produced: msg 2
consumed: msg 2
produced: msg 3
consumed: msg 3

然而,即使consumerproducer都并行运行,我也只能获得输出,并将生成和使用的消息完全分批为两组:

代码语言:javascript
复制
starting producer
starting consumer
produced: msg 1
produced: msg 2
produced: msg 3
consumed: msg 1
consumed: msg 2
consumed: msg 3

下面是一些kafkacat/kafka生产者属性以及我已经尝试更改生产者行为的值。

代码语言:javascript
复制
# kcat options having no effect on the test case
-u  # unbuffered output
-T  # act like `tee` and echo input

# kafka properties having no effect on the test case
-X queue.buffering.max.messages=1
-X queue.buffering.max.kbytes=1
-X batch.num.messages=1
-X queue.buffering.max.ms=100
-X socket.timeout.ms=100
-X max.in.flight.requests.per.connection=1
-X auto.commit.interval.ms=100
-X request.timeout.ms=100
-X message.timeout.ms=100
-X offset.store.sync.interval.ms=1
-X message.copy.max.bytes=100
-X socket.send.buffer.bytes=100
-X linger.ms=1
-X delivery.timeout.ms=100

上述选项对输油管道没有任何影响。

我遗漏了什么?

编辑:这似乎是一个与kcat或图书馆员有关的问题。可能-X属性没有正确使用。

以下是当前的观察结果(将在我了解更多信息时编辑它们):

  • 当发送更大的有效负载10000条消息而脚本中延迟较小时,kcat将产生几批消息。它似乎是基于大小的,但不能被任何-X选项所配置。
  • 然后,批次也被消费者正确地捡起。因此,它肯定是一个生产者发行的
  • 我还尝试了这个脚本在码头与当前的kafkacat从样条恢复。这一个似乎刷新了一个,但更早;使用较少的数据,以填补“隐藏的”缓冲区。-X选项也没有任何效果。
  • 此外,似乎检查了-X属性。如果我列出范围外的值,kcat (或者可能是librdkafka)会抱怨。但是,为任何超时值和缓冲区大小值设置低值都不起作用。
  • 当为每条消息调用kcat时(这有点过分了),消息就会立即产生。

问题仍然是:

我如何告诉卡夫卡管道立即产生我的第一条信息?

如果在Go中有一个例子,这也会有帮助,因为我正在使用卡夫卡的一个小型Go程序进行类似的观察。如果我能把这个问题简化为一个可张贴的格式,我可能会发布一个单独的问题。

更新:我尝试在纯Linux主机上使用bitnami映像。通过kafkacat进行生产和消费,在此系统上按预期工作。一旦我知道更多,我会发一个答复。

EN

回答 1

Stack Overflow用户

发布于 2022-04-01 22:19:05

我就是这样解决这个问题的。

这个问题并不在卡夫卡码头的形象中。它们都像预期的那样工作,尽管我能够通过启动kcat来攻击基于Java的Kafkas。后来我添加了rpk (RedPanda,一个非Java的"Kafka"),它在我的单个节点设置中要稳定得多。

发现

  • 使用kcat,我找不到任何在没有缓冲的情况下立即生成消息的方法。众所周知,它忽略了所有的-X args。(edenhill/kcat版本1.7.0,MacOS,Arm64)
  • 发送单个消息是有效的。当关闭输入管道时,kcat将刷新输出缓冲区。
  • 可以立即通过kcat消费消息,并且在默认情况下可以工作。
  • 其他卡夫卡客户没有这个问题。我创建了一个小的卡夫卡-围棋例子,它按照预期工作;默认情况下没有广泛的缓冲。

Conculsion

  • 不要使用kcat通过长时间运行的管道生成消息.
  • 使用卡夫卡或类似的客户端事件进行小型健康检查和其他“脚本”。
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71533406

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档