我使用Spring和Kafka,我做了一个HTTP POST请求,如下所示,并通过Kafka主题发送一些信息给另一个服务。
@RequestMapping(method = RequestMethod.POST, value = "/portfolio")
public void getPortfolio(
Authentication auth,
@RequestBody User user
) {
//Data Transfer Object
UserDTO dto = user.toDTO();
dto.setId(((AuthenticatedUser) auth.getPrincipal()).getId());
//Sending message to Kafka topic
sender.sendPortfolioRequest(dto);
}然后,我想侦听另一个主题的响应,并在HTTP响应中返回数据,但我被困在这里了。我可以使用下面的监听器方法来监听响应,但不知道如何将两者放在一起。
@KafkaListener(
topics = Topics.PORTFOLIO_RESULT,
containerFactory = "portfolioKafkaListenerContainerFactory"
)
public void portfolioListener(UserPortfolioDTO portfolio) {
System.out.println("Recieved Portfolio: " + portfolio.toString());
}附注:我刚开始使用HTTP请求,不知道这是不是我想要实现的正确方式,或者我是否应该用POST创建一个新的资源,然后重定向到那个或其他什么。
发布于 2019-01-03 23:48:37
这不能用@KafkaListener来完成,因为它是单独启动的,并且完全在它自己的线程中工作。同时,您希望在HTTP请求线程中得到回复。
对于您来说,这里唯一可能的解决方案是使用ConsumerFactory并手动使用Apache Kafka Consumer。所以,你发送,你从工厂获取一个Consumer实例,调用它的被阻塞的poll()直到得到结果,为HTTP构建一个响应并关闭Consumer。
发布于 2019-01-04 00:06:21
根据我的理解,我建议您启用asych http请求,以便能够链接您的进程。
Creating Asynchronous Methods with springboot
此选项将允许您处理sendPortfolioRequest并释放http请求(否则客户端将出现http请求超时)。
您尝试做的看起来像是一个反模式:您想要链接sychron http请求(因为您的http客户端正在等待来自服务器的响应)和一个异步消息传递系统(Kafka)。
为了能够做你想做的事情,我建议你改变你的http端点并添加一个websocket,以达到最佳实践。
https://stackoverflow.com/questions/54025437
复制相似问题