我正在使用ReplyingKafkaTemplate在两个微服务之间建立一个同步调用。事件的接收方使用SendTo进行注释,如下所示:
@KafkaListener(topics = "${kafka.topic.prefix}"
+ "${kafka.topic.name}", containerFactory = "customEventKafkaListenerFactory")
@SendTo
public CustomResponseEvent onMessage(
@Payload @Valid CustomRequestEvent event, @Header(KafkaHeaders.CORRELATION_ID) String correlationId,
@Header(KafkaHeaders.REPLY_TOPIC) String replyTopic) {
//Making some REST API calls to another external system here using RestTemplate
}REST调用可以抛出4xx或5xx。有多个这样的调用,有的呼叫内部系统,有的呼叫外部系统。这可能是个糟糕的设计,但我们不要再谈这个了。
我希望为RestTemplate提供一个全局异常处理程序,在这里我可以捕获所有异常,然后向事件的原始发送方返回一个响应。我使用在使用者中接收到的相同的replyTopic和correlationId来发布事件。但是,响应的接收方仍然抛出No pending reply异常。
发布于 2019-09-04 17:51:33
@KafkaListener附带了以下内容:
/**
* Set an {@link org.springframework.kafka.listener.KafkaListenerErrorHandler} bean
* name to invoke if the listener method throws an exception.
* @return the error handler.
* @since 1.3
*/
String errorHandler() default "";它用于捕获和处理所有下游异常,如果返回结果,则将其发送回replyTopic。
public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
Message<?> message = toMessagingMessage(record, acknowledgment, consumer);
logger.debug(() -> "Processing [" + message + "]");
try {
Object result = invokeHandler(record, acknowledgment, message, consumer);
if (result != null) {
handleResult(result, record, message);
}
}
catch (ListenerExecutionFailedException e) { // NOSONAR ex flow control
if (this.errorHandler != null) {
try {
Object result = this.errorHandler.handleError(message, e, consumer);
if (result != null) {
handleResult(result, record, message);
}
}
catch (Exception ex) {
throw new ListenerExecutionFailedException(createMessagingErrorMessage(// NOSONAR stack trace loss
"Listener error handler threw an exception for the incoming message",
message.getPayload()), ex);
}
}
else {
throw e;
}
}有关更多信息,请参见RecordMessagingMessageListenerAdapter源代码。
https://stackoverflow.com/questions/57791556
复制相似问题