尝试通过R中的rkafka库返回消息。
遵循相同的rkafka文档@ https://cran.r-project.org/web/packages/rkafka/vignettes/rkafka.pdf
输出返回"",其中不包含实际的消息。Kafka工具确认消息是生产者发送的。
代码:
prod1=rkafka.createProducer("127.0.0.1:9092")
rkafka.send(prod1,"test","127.0.0.1:9092","Testing once")
rkafka.closeProducer(prod1)
consumer1=rkafka.createConsumer("127.0.0.1:2181","test")
print(rkafka.read(consumer1))输出:
[1] ""所需的输出将返回"Testing once"。
发布于 2018-09-12 13:45:35
为了读取已经写入主题的主题消息(在消费者启动之前),您需要将偏移量设置为尽可能小的值(相当于--from-beginning)。根据rkafka docs,autoOffseetReset参数默认为largest
autoOffsetReset
最小:自动将偏移量重置为最小偏移量
最大:自动将偏移量重置为最大偏移量
任何其他内容:向消费者抛出异常anything :可选
类型:字符串
默认值:最大
为了能够使用消息,您需要将autoOffsetReset设置为"smallest"。
consumer1=rkafka.createConsumer("127.0.0.1:2181","test", autoOffsetReset="smallest")发布于 2018-09-12 17:50:27
更新:此代码有效:
library(rkafka)
prod1=rkafka.createProducer("127.0.0.1:9092")
rkafka.send(prod1,"test","127.0.0.1:9092","Testing once")
rkafka.send(prod1,"test","127.0.0.1:9092","Testing twice")
rkafka.closeProducer(prod1)
consumer1=rkafka.createConsumer("127.0.0.1:2181","test",groupId = "test-consumer-
group",zookeeperConnectionTimeoutMs = "100000",autoCommitEnable = "NULL",
autoCommitInterval = "NULL",autoOffsetReset = "NULL")
print(rkafka.read(consumer1))
print(rkafka.readPoll(consumer1))
rkafka.closeConsumer(consumer1)关键是删除Kafka生成的日志后重启Kafka。
https://stackoverflow.com/questions/52287916
复制相似问题