首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Alpakka AMQP :如何检测声明异常?

Alpakka AMQP :如何检测声明异常?
EN

Stack Overflow用户
提问于 2020-05-12 20:55:44
回答 1查看 105关注 0票数 0

我有一个AMQP源和AMQP Sink声明:

代码语言:javascript
复制
List<Declaration> declarations = new ArrayList<Declaration>() {{
      add(QueueDeclaration.create(sourceExchangeName));
      add(BindingDeclaration.create(sourceExchangeName, sourceExchangeName).withRoutingKey(sourceRoutingKey));
    }};

amqpSource = AmqpSource
    .committableSource(
        NamedQueueSourceSettings.create(connectionProvider, sourceExchangeName)
            .withDeclarations(declarations),
        bufferSize);

AmqpWriteSettings amqpWriteSettings = AmqpWriteSettings.create(connectionProvider)
    .withExchange("DEST_XCHANGE")
    .withRoutingKey("ROUTE123")
    .withDeclaration(ExchangeDeclaration.create(destinationExchangeName,
        BuiltinExchangeType.DIRECT.getType()));

amqpSink = AmqpSink.create(amqpWriteSettings);

然后我就有了流量..。

代码语言:javascript
复制
amqpSource.map(doSomething).async().map(doSomethingElse).async().to(amqpSink)

现在,在我启动应用程序之后,发送到源队列的消息没有被消耗。后来我发现这是由于声明期间发生的错误造成的。(也就是说,当我删除.withDeclarations(..)时,它工作得很好。)在源和水槽设置中。

所以我的问题是:

  1. 如何检测AMQP源和Sink是否启动并运行良好?
  2. 如何忽略声明异常?
  3. 如果出现任何异常,我如何知道并使系统失败?
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-05-15 15:25:55

为了回答1和3,AmqpSink实现了您必须保留的CompletionStage<Done>,并处理(注册一些回调函数)以观察流的失败和完成。在docs示例中,我们阻止了生产代码(https://doc.akka.io/docs/alpakka/current/amqp.html#with-sink)中不太好的完成阶段,这可能是因为该示例包含在一个Alpakka测试中。更喜欢常用的CompletionStage回调/转换方法(例如,参见本导论)。

当发生错误、流被物化/启动或在元素处理过程中发生错误时,CompletionStage将失败,或者,在源到达结束时完成,并且每个元素都通过流进入接收器。这意味着,对于启动流,如果它不是非常快地失败,它正在运行。

对于问题2,不确定是否可能忽略声明异常,可能是这些异常总是导致连接失败。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61761740

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档