首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >应答Kafka模板-异常处理

应答Kafka模板-异常处理
EN

Stack Overflow用户
提问于 2019-09-04 15:27:52
回答 1查看 1.2K关注 0票数 0

我正在使用ReplyingKafkaTemplate在两个微服务之间建立一个同步调用。事件的接收方使用SendTo进行注释,如下所示:

代码语言:javascript
复制
@KafkaListener(topics = "${kafka.topic.prefix}"
        + "${kafka.topic.name}", containerFactory = "customEventKafkaListenerFactory")
@SendTo
public CustomResponseEvent onMessage(
        @Payload @Valid CustomRequestEvent event, @Header(KafkaHeaders.CORRELATION_ID) String correlationId,
        @Header(KafkaHeaders.REPLY_TOPIC) String replyTopic) {

   //Making some REST API calls to another external system here using RestTemplate
}

REST调用可以抛出4xx或5xx。有多个这样的调用,有的呼叫内部系统,有的呼叫外部系统。这可能是个糟糕的设计,但我们不要再谈这个了。

我希望为RestTemplate提供一个全局异常处理程序,在这里我可以捕获所有异常,然后向事件的原始发送方返回一个响应。我使用在使用者中接收到的相同的replyTopiccorrelationId来发布事件。但是,响应的接收方仍然抛出No pending reply异常。

  1. 不管我上面有什么方法,是否有可能实现这样一个中心错误响应事件发布者?
  2. 是否有其他最适合此异常处理的替代方案?
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-09-04 17:51:33

@KafkaListener附带了以下内容:

代码语言:javascript
复制
/**
 * Set an {@link org.springframework.kafka.listener.KafkaListenerErrorHandler} bean
 * name to invoke if the listener method throws an exception.
 * @return the error handler.
 * @since 1.3
 */
String errorHandler() default "";

它用于捕获和处理所有下游异常,如果返回结果,则将其发送回replyTopic

代码语言:javascript
复制
public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
    Message<?> message = toMessagingMessage(record, acknowledgment, consumer);
    logger.debug(() -> "Processing [" + message + "]");
    try {
        Object result = invokeHandler(record, acknowledgment, message, consumer);
        if (result != null) {
            handleResult(result, record, message);
        }
    }
    catch (ListenerExecutionFailedException e) { // NOSONAR ex flow control
        if (this.errorHandler != null) {
            try {
                Object result = this.errorHandler.handleError(message, e, consumer);
                if (result != null) {
                    handleResult(result, record, message);
                }
            }
            catch (Exception ex) {
                throw new ListenerExecutionFailedException(createMessagingErrorMessage(// NOSONAR stack trace loss
                        "Listener error handler threw an exception for the incoming message",
                        message.getPayload()), ex);
            }
        }
        else {
            throw e;
        }
    }

有关更多信息,请参见RecordMessagingMessageListenerAdapter源代码。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57791556

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档