我正在开发一个由SpringMVC和Spring Cloud Kafka提供支持的微服务。
为简单起见,我将只关注发出HTTP请求的部分。
我有一个如下所示的绑定函数(请注意,我使用的是函数式绑定)。
@SpringBootApplication
public class ExampleApplication {
// PayloadSender uses RestTemplate to send HTTP request.
@Autowired
private PayloadSender payloadSender;
@Bean
public Function<KStream<String, Input>, KStream<String, Output>> process() {
// payloadSender.send() is a blocking call which sends payload using RestTemplate,
// once response is received it will collect all info and create "Output" object
return input -> input
.map((k,v) -> KeyValue.pair(k, payloadSender.send(v))); // "send" is a blocking call
// Question: if autoCommitOffset is set to true, would offset automatically commit right after the "map" function from KStream?
}
public static void main(String[] args) {
SpringApplication.run(ExampleApplication.class, args);
}
}从这个例子中,您可以看到payloadSender正在使用RestTemplate从输入流发送有效负载,并在接收到响应时创建" output“对象并生成到输出主题。
由于payloadSender.send()被阻塞,我担心这会导致性能问题。最重要的是,如果HTTP请求超时,我担心它会超过提交间隔(通常HTTP超时间隔比消费者提交间隔大得多),并导致kafka代理认为消费者已经死了(如果我错了,请纠正我)。
那么对于这种情况有没有更好的解决方案呢?我最终会切换到spring-reactive,但目前我需要确保MVC模型能够工作。尽管我不确定spring-reactive是否能神奇地解决这个问题。
发布于 2020-12-23 23:07:19
默认max.poll.interval为5分钟;您可以增加或减少max.poll.records。您还可以设置rest调用的超时。
https://stackoverflow.com/questions/65417369
复制相似问题