我有一个alpakka源-> kafka接收器类似于一个流。我正在查看alpakka jms消费者文档,并试图找出这能为我提供什么样的交付保证。
来自https://doc.akka.io/docs/alpakka/current/jms/consumer.html
val result: Future[immutable.Seq[javax.jms.Message]] =
jmsSource
.take(msgsIn.size)
.map { ackEnvelope =>
ackEnvelope.acknowledge()
ackEnvelope.message
}
.runWith(Sink.seq)我希望这实际上是信息只有在沉船成功后才会被破坏(至少一次交付保证),但我不能依赖于一厢情愿的想法。
鉴于alpakka似乎没有利用任何一种在重新启动过程中持续存在的状态,我想不出我如何才能准确地得到--一旦在这里保证了阿拉·弗林克,但我至少可以指望--至少一次,或者我必须(以某种方式)插入卡夫卡制片人flexiFlow (https://doc.akka.io/docs/alpakka-kafka/current/producer.html#producer-as-a-flow)的地图。
谢谢你,Fil
发布于 2022-08-22 15:19:56
在该流中,在将消息添加到物化序列之前,以及在result可以对您进行任何操作(即Future完成)之前,就会发生ack。因此,它将是at-most-once。
要将ack延迟到某些处理成功,最简单的方法是将您正在处理的消息保存在流中,而不是实现未来。Alpakka Kafka生产者支持传递元素,它可以是JMS消息:
val topic: String = ???
def javaxMessageToKafkaMessage[Key, Value](
ae: AckEnvelope,
kafkaKeyFor: javax.jms.Message => Key,
kafkaValueFor: javax.jms.Message => Value
): ProducerMessage.Envelope[Key, Value, javax.jms.Message] = {
val key = kafkaKeyFor(ae.message)
val value = kafkaValueFor(ae.message)
ProducerMessage.single(new ProducerRecord(topic, key, value), ae)
}
// types K and V are unspecified...
jmsSource
.map(
javaxMessageToKafkaMessage[K, V](
_,
{ _ => ??? },
{ _ => ??? }
)
)
.via(Producer.flexiFlow(producerSettings))
.to(
Sink.foreach { results =>
val msg = results.passThrough
msg.acknowledge()
}
)(Keep.both)处理此流的run将以带有Future[Done]的JmsConsumerControl的元组形式出现。由于不熟悉JMS,我不知道使用者控件的shutdown将如何与a交互。
https://stackoverflow.com/questions/73446847
复制相似问题