我有下面的代码。
@Incoming("my-topic")
void process(String someEvent) {
String someResponse = assuminglyRealFastReactiveClientCall();
}上述代码引发阻塞线程异常。这是用@Blocking修正的。
@Incoming("my-topic")
@Blocking
void process(String someEvent) {
String someResponse = assuminglyRealFastReactiveClientCall();
}如果我把String assuminglyRealFastReactiveClientCall()切换到Uni<String> assuminglyRealFastReactiveClientCall()
我猜消费者方法必须切换到手动的ack策略,并且消息需要根据订阅的结果进行加/加,所以呢?
@Incoming("my-topic")
void process(Message<String> someEvent) {
assuminglyRealFastReactiveClientCall()
.subscribe().with(s -> {
System.out.println("Response: " + s);
event.ack();
}, t -> event.nack(t));
}发布于 2022-06-14 17:12:48
@Incoming("my-topic")
Uni<Void> process(Message<String> someEvent) {
return assuminglyRealFastReactiveClientCall()
.invoke(this::handleResponse)
.chain(response -> Uni.createFrom().completionStage(someEvent.ack()));
}
private void handleResponse(String response) {
// Do something with the response
}Smallrye反应性消息传递文档中的消费信息段落还有更多的示例。
https://stackoverflow.com/questions/72620075
复制相似问题