我正在尝试用Alpakka实现一个连接到AMQP代理的非常简单的服务。我只想让它在消息被推送到给定的交换/主题时,将其队列中的消息作为流使用。
在我的测试中似乎一切正常,但是当我尝试启动我的服务时,我意识到我的流只使用了我的消息一次,然后退出。
基本上我使用的是Alpakka文档中的代码:
def consume()={
val amqpSource = AmqpSource.committableSource(
TemporaryQueueSourceSettings(connectionProvider, exchangeName)
.withDeclaration(exchangeDeclaration)
.withRoutingKey(topic),
bufferSize = prefetchCount
)
val amqpSink = AmqpSink.replyTo(AmqpReplyToSinkSettings(connectionProvider))
amqpSource.mapAsync(4)(msg => onMessage(msg)).runWith(amqpSink)
}我尝试每秒钟安排一次consume()执行,但我遇到了OutOfMemoryException问题。
有没有什么合适的方法让这段代码以无限循环的形式运行?
发布于 2019-03-15 00:23:25
如果您希望在Source出现故障或被取消时重新启动它,请用RestartSource.withBackoff包装它。
https://stackoverflow.com/questions/55166783
复制相似问题