我有单节点Kafka实例,通过docker-组合在本地运行。
(系统: Mac/Arm64 64,图像: wurstmeister/kafka:2.13-2.6.0)
我想使用kafkacat (通过家用kcat安装)立即使用,生成和消费卡夫卡之间的消息。
下面是一个最小的脚本:
#!/usr/bin/env bash
NUM_MESSAGES=${1:-3} # use arg1 or use default=3
KCAT_ARGS="-q -u -c $NUM_MESSAGES -b localhost:9092 -t unbuffered"
log() { echo "$*" 1>&2; }
producer() {
log "starting producer"
for i in `seq 1 3`; do
echo "msg $i"
log "produced: msg $i"
sleep 1
done | kcat $KCAT_ARGS -P
}
consumer() {
log "starting consumer"
kcat $KCAT_ARGS -C -o end | while read line; do
log "consumed: $line"
done
}
producer&
consumer&
wait我希望(大致)得到以下输出:
starting producer
starting consumer
produced: msg 1
consumed: msg 1
produced: msg 2
consumed: msg 2
produced: msg 3
consumed: msg 3然而,即使consumer和producer都并行运行,我也只能获得输出,并将生成和使用的消息完全分批为两组:
starting producer
starting consumer
produced: msg 1
produced: msg 2
produced: msg 3
consumed: msg 1
consumed: msg 2
consumed: msg 3下面是一些kafkacat/kafka生产者属性以及我已经尝试更改生产者行为的值。
# kcat options having no effect on the test case
-u # unbuffered output
-T # act like `tee` and echo input
# kafka properties having no effect on the test case
-X queue.buffering.max.messages=1
-X queue.buffering.max.kbytes=1
-X batch.num.messages=1
-X queue.buffering.max.ms=100
-X socket.timeout.ms=100
-X max.in.flight.requests.per.connection=1
-X auto.commit.interval.ms=100
-X request.timeout.ms=100
-X message.timeout.ms=100
-X offset.store.sync.interval.ms=1
-X message.copy.max.bytes=100
-X socket.send.buffer.bytes=100
-X linger.ms=1
-X delivery.timeout.ms=100上述选项对输油管道没有任何影响。
我遗漏了什么?
编辑:这似乎是一个与kcat或图书馆员有关的问题。可能-X属性没有正确使用。
以下是当前的观察结果(将在我了解更多信息时编辑它们):
kcat将产生几批消息。它似乎是基于大小的,但不能被任何-X选项所配置。kafkacat从样条恢复。这一个似乎刷新了一个,但更早;使用较少的数据,以填补“隐藏的”缓冲区。-X选项也没有任何效果。-X属性。如果我列出范围外的值,kcat (或者可能是librdkafka)会抱怨。但是,为任何超时值和缓冲区大小值设置低值都不起作用。kcat时(这有点过分了),消息就会立即产生。问题仍然是:
我如何告诉卡夫卡管道立即产生我的第一条信息?
如果在Go中有一个例子,这也会有帮助,因为我正在使用卡夫卡的一个小型Go程序进行类似的观察。如果我能把这个问题简化为一个可张贴的格式,我可能会发布一个单独的问题。
更新:我尝试在纯Linux主机上使用bitnami映像。通过kafkacat进行生产和消费,在此系统上按预期工作。一旦我知道更多,我会发一个答复。
发布于 2022-04-01 22:19:05
我就是这样解决这个问题的。
这个问题并不在卡夫卡码头的形象中。它们都像预期的那样工作,尽管我能够通过启动kcat来攻击基于Java的Kafkas。后来我添加了rpk (RedPanda,一个非Java的"Kafka"),它在我的单个节点设置中要稳定得多。
发现
kcat,我找不到任何在没有缓冲的情况下立即生成消息的方法。众所周知,它忽略了所有的-X args。(edenhill/kcat版本1.7.0,MacOS,Arm64)kcat将刷新输出缓冲区。kcat消费消息,并且在默认情况下可以工作。Conculsion
kcat通过长时间运行的管道生成消息.https://stackoverflow.com/questions/71533406
复制相似问题