首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >alpakka jms客户确认模式交付保证

alpakka jms客户确认模式交付保证
EN

Stack Overflow用户
提问于 2022-08-22 14:34:09
回答 1查看 36关注 0票数 0

我有一个alpakka源-> kafka接收器类似于一个流。我正在查看alpakka jms消费者文档,并试图找出这能为我提供什么样的交付保证。

来自https://doc.akka.io/docs/alpakka/current/jms/consumer.html

代码语言:javascript
复制
val result: Future[immutable.Seq[javax.jms.Message]] =
  jmsSource
    .take(msgsIn.size)
    .map { ackEnvelope =>
      ackEnvelope.acknowledge()
      ackEnvelope.message
    }
    .runWith(Sink.seq)

我希望这实际上是信息只有在沉船成功后才会被破坏(至少一次交付保证),但我不能依赖于一厢情愿的想法。

鉴于alpakka似乎没有利用任何一种在重新启动过程中持续存在的状态,我想不出我如何才能准确地得到--一旦在这里保证了阿拉·弗林克,但我至少可以指望--至少一次,或者我必须(以某种方式)插入卡夫卡制片人flexiFlow (https://doc.akka.io/docs/alpakka-kafka/current/producer.html#producer-as-a-flow)的地图。

谢谢你,Fil

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-08-22 15:19:56

在该流中,在将消息添加到物化序列之前,以及在result可以对您进行任何操作(即Future完成)之前,就会发生ack。因此,它将是at-most-once

要将ack延迟到某些处理成功,最简单的方法是将您正在处理的消息保存在流中,而不是实现未来。Alpakka Kafka生产者支持传递元素,它可以是JMS消息:

代码语言:javascript
复制
val topic: String = ???

def javaxMessageToKafkaMessage[Key, Value](
  ae: AckEnvelope,
  kafkaKeyFor: javax.jms.Message => Key,
  kafkaValueFor: javax.jms.Message => Value
): ProducerMessage.Envelope[Key, Value, javax.jms.Message] = {
  val key = kafkaKeyFor(ae.message)
  val value = kafkaValueFor(ae.message)
  ProducerMessage.single(new ProducerRecord(topic, key, value), ae)
}

// types K and V are unspecified...
jmsSource
  .map(
    javaxMessageToKafkaMessage[K, V](
      _,
      { _ => ??? },
      { _ => ??? }
    )
  )
  .via(Producer.flexiFlow(producerSettings))
  .to(
    Sink.foreach { results =>
      val msg = results.passThrough
      msg.acknowledge()
    }
  )(Keep.both)

处理此流的run将以带有Future[Done]JmsConsumerControl的元组形式出现。由于不熟悉JMS,我不知道使用者控件的shutdown将如何与a交互。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73446847

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档