首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Alpakka的无限AMQP消费者

使用Alpakka的无限AMQP消费者
EN

Stack Overflow用户
提问于 2019-03-14 23:48:26
回答 1查看 160关注 0票数 0

我正在尝试用Alpakka实现一个连接到AMQP代理的非常简单的服务。我只想让它在消息被推送到给定的交换/主题时,将其队列中的消息作为流使用。

在我的测试中似乎一切正常,但是当我尝试启动我的服务时,我意识到我的流只使用了我的消息一次,然后退出。

基本上我使用的是Alpakka文档中的代码:

代码语言:javascript
复制
def consume()={
    val amqpSource = AmqpSource.committableSource(
      TemporaryQueueSourceSettings(connectionProvider, exchangeName)
        .withDeclaration(exchangeDeclaration)
        .withRoutingKey(topic),
      bufferSize = prefetchCount
    )

    val amqpSink = AmqpSink.replyTo(AmqpReplyToSinkSettings(connectionProvider))

    amqpSource.mapAsync(4)(msg => onMessage(msg)).runWith(amqpSink)
}

我尝试每秒钟安排一次consume()执行,但我遇到了OutOfMemoryException问题。

有没有什么合适的方法让这段代码以无限循环的形式运行?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-03-15 00:23:25

如果您希望在Source出现故障或被取消时重新启动它,请用RestartSource.withBackoff包装它。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55166783

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档