我只想跟着一个演示尝试使用卡夫卡在围棋。我可以成功地生成萨拉马的消息,但是当我想要消费这个消息时,我无法得到它。
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
// kafka consumer
func main() {
consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
if err != nil {
fmt.Printf("fail to start consumer, err:%v\n", err)
return
}
partitionList, err := consumer.Partitions("test")
if err != nil {
fmt.Printf("fail to get list of partition:err%v\n", err)
return
}
fmt.Println(partitionList)
for partition := range partitionList {
pc, err := consumer.ConsumePartition("test", int32(partition), sarama.OffsetNewest)
if err != nil {
fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
return
}
defer pc.AsyncClose()
go func(sarama.PartitionConsumer) {
for msg := range pc.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
}
}(pc)
}
}代码的返回是
[0]
-1但实际上,我可以通过卡夫卡-控制台-消费者获得信息。
发布于 2021-12-14 09:21:08
我相信你不是在等消息来.
下面是您的代码中包含的问题列表:
defer pc.AsyncClose()会在函数退出时触发,而不是exit. go func(sarama.PartitionConsumer) {
for msg := range pc.Messages() {
fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
}
}(pc)go func(sarama.PartitionConsumer) { --这只是类型。go func(pc sarama.PartitionConsumer) {.删除goroutine,如果您想要以hello world为例,只需检查使用者通道。
https://stackoverflow.com/questions/70345720
复制相似问题