我正在使用Alpakka及其JMS连接器将数据从Oracle AQ中排出队列。通过遵循这指南,我可以在下面给出非常基本的实现。
我的问题是如何使它具有事务性,这样我就可以保证如果抛出异常,我的消息不会丢失。
object ConsumerApp extends App {
implicit val system: ActorSystem = ActorSystem("actor-system")
implicit val materializer: ActorMaterializer = ActorMaterializer()
val connectionFactory = AQjmsFactory.getConnectionFactory(getOracleDataSource())
val out = JmsSource.textSource(
JmsSourceSettings(connectionFactory).withQueue("My_Queue")
)
val sink = Sink.foreach { message: String =>
println("in sink: " + message)
throw new Exception("") // !!! MESSAGE IS LOST !!!
}
out.runWith(sink, materializer)
}如果是PL/SQL,解决方案将如下所示:
DECLARE
dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle RAW (44);
msg SYS.AQ$_JMS_TEXT_MESSAGE;
BEGIN
DBMS_AQ.dequeue (
queue_name => 'My_Queue',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => msg,
msgid => message_handle
);
-- do something with the message
COMMIT;
END;发布于 2017-12-06 22:18:49
流阶段失败时的默认行为是关闭整个流。您必须决定如何在流中使用处理错误。例如,一种方法是使用退避策略对流进行重新启动。
另外,由于您使用的是Alpakka连接器,所以请将确认模式设置为ClientAcknowledge (这在Alpakka 0.15之后可用)。使用此配置,未被确认的消息可以通过JMS源再次传递。例如:
val jmsSource: Source[Message, NotUsed] = JmsSource(
JmsSourceSettings(connectionFactory)
.withQueue("My_Queue")
.withAcknowledgeMode(AcknowledgeMode.ClientAcknowledge)
)
val result = jmsSource
.map {
case textMessage: TextMessage =>
val text = textMessage.getText
textMessage.acknowledge()
text
}
.runForeach(println)https://stackoverflow.com/questions/47681475
复制相似问题