首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >rkafka.read()不返回消息(仅返回双引号)

rkafka.read()不返回消息(仅返回双引号)
EN

Stack Overflow用户
提问于 2018-09-12 13:17:12
回答 2查看 360关注 0票数 1

尝试通过R中的rkafka库返回消息。

遵循相同的rkafka文档@ https://cran.r-project.org/web/packages/rkafka/vignettes/rkafka.pdf

输出返回"",其中不包含实际的消息。Kafka工具确认消息是生产者发送的。

代码:

代码语言:javascript
复制
prod1=rkafka.createProducer("127.0.0.1:9092")
rkafka.send(prod1,"test","127.0.0.1:9092","Testing once")
rkafka.closeProducer(prod1)
consumer1=rkafka.createConsumer("127.0.0.1:2181","test")
print(rkafka.read(consumer1))

输出:

代码语言:javascript
复制
[1] ""

所需的输出将返回"Testing once"

EN

回答 2

Stack Overflow用户

发布于 2018-09-12 13:45:35

为了读取已经写入主题的主题消息(在消费者启动之前),您需要将偏移量设置为尽可能小的值(相当于--from-beginning)。根据rkafka docsautoOffseetReset参数默认为largest

autoOffsetReset

最小:自动将偏移量重置为最小偏移量

最大:自动将偏移量重置为最大偏移量

任何其他内容:向消费者抛出异常anything :可选

类型:字符串

默认值:最大

为了能够使用消息,您需要将autoOffsetReset设置为"smallest"

代码语言:javascript
复制
consumer1=rkafka.createConsumer("127.0.0.1:2181","test", autoOffsetReset="smallest")
票数 0
EN

Stack Overflow用户

发布于 2018-09-12 17:50:27

更新:此代码有效:

代码语言:javascript
复制
library(rkafka)

prod1=rkafka.createProducer("127.0.0.1:9092")
rkafka.send(prod1,"test","127.0.0.1:9092","Testing once")
rkafka.send(prod1,"test","127.0.0.1:9092","Testing twice")

rkafka.closeProducer(prod1)
consumer1=rkafka.createConsumer("127.0.0.1:2181","test",groupId = "test-consumer- 
group",zookeeperConnectionTimeoutMs = "100000",autoCommitEnable = "NULL", 
autoCommitInterval = "NULL",autoOffsetReset = "NULL")


print(rkafka.read(consumer1))
print(rkafka.readPoll(consumer1))
rkafka.closeConsumer(consumer1)

关键是删除Kafka生成的日志后重启Kafka。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52287916

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档