我有一个复杂的技术栈。我正在利用Netflix提供GraphQL服务。幕后有一堆JMS组件,它们从各种服务中发送和接收数据。除了GraphQL订阅之外,我拥有所有的功能。
具体来说,我要做的是为来自一个GraphQL主题的消息创建一个ActiveMQ订阅。
所以我有一个SubscriptionDataFetcher,如下所示:
@DgsComponent
public class SurveyResultsSubscriptionDataFetcher {
private final Publisher<SurveyResult> surveyResultsReactiveSource;
@Autowired
public SurveyResultsSubscriptionDataFetcher(Publisher<SurveyResult> surveyResultsReactiveSource) {
this.surveyResultsReactiveSource = surveyResultsReactiveSource;
}
@DgsData(parentType = DgsConstants.SUBSCRIPTION.TYPE_NAME, field = DgsConstants.SUBSCRIPTION.SurveyResultStream)
public Publisher<SurveyResult> surveyResults() {
return surveyResultsReactiveSource;
}
}在我的Spring配置中,我使用了以下Spring集成流:
@Bean
@Scope(BeanDefinition.SCOPE_PROTOTYPE)
public Publisher<SurveyResult> surveyResultsReactiveSource() {
SurveyResultMessageConverter converter = new SurveyResultMessageConverter();
return Flux.from(
IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination(surveyDestination))
.log(LoggingHandler.Level.DEBUG)
.log()
.toReactivePublisher())
.map((message) -> converter.fromMessage(message, SurveyResult.class));
}我要说几件事:
@JmsListener来接收这些消息,主题是@JmsListener之后,也会将Mongo反应性Spring数据存储库连接到GraphQL订阅,数据由客户端接收。当我将客户端连接到订阅时,我会看到以下日志:
PublishSubscribeChannel : Channel 'unknown.channel.name' has 1 subscriber(s).
DgsWebSocketHandler : Subscription started for 1我怀疑在建立web套接字连接时消息侦听器容器没有被激活。我应该“激活”频道适配器吗?我遗漏了什么?
技术人员:
// spring boots - version 2.4.3
implementation "org.springframework.boot:spring-boot-starter-web"
implementation "org.springframework.boot:spring-boot-starter-activemq"
implementation "org.springframework.boot:spring-boot-starter-data-mongodb-reactive"
implementation "org.springframework.boot:spring-boot-starter-integration"
implementation 'org.springframework.boot:spring-boot-starter-security'
// spring integration
implementation group: 'org.springframework.integration', name: 'spring-integration-jms', version: '5.4.4'
// dgs
implementation "com.netflix.graphql.dgs:graphql-dgs-spring-boot-starter:3.10.2"
implementation 'com.netflix.graphql.dgs:graphql-dgs-subscriptions-websockets-autoconfigure:3.10.2'更新1:
值得注意的是,如果我将订阅更新为以下内容,则会在客户端获得结果。
@DgsData(parentType = DgsConstants.SUBSCRIPTION.TYPE_NAME, field = DgsConstants.SUBSCRIPTION.SurveyResultStream)
public Publisher<SurveyResult> surveyResults() {
// repository is a ReactiveMongoRepository
return repository.findAll();
}更新2:
这是最后确定的bean,以防它在接受的解决方案基础上帮助某人。我需要的是一个主题,而不是队列上的侦听器。
@Bean
public Publisher<Message<SurveyResult>> surveyResultsReactiveSource() {
SurveyResultMessageConverter converter = new SurveyResultMessageConverter();
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(
Jms.container(connectionFactory(), surveyDestination).pubSubDomain(true))
.jmsMessageConverter(converter))
.toReactivePublisher();
}发布于 2021-04-20 20:55:01
你的问题是:
return Flux.from(
IntegrationFlows.from(框架没有看到内部的IntegrationFlow实例来正确地解析和注册。
要使它正常工作,您需要考虑将IntegrationFlow声明为顶级bean。
就像这样:
@Bean
public Publisher<Message<SurveyResult>> surveyResultsReactiveSource() {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination(surveyDestination))
.log(LoggingHandler.Level.DEBUG)
.transform([transform to SurveyResult])
.toReactivePublisher();
}现在,框架知道这个逻辑IntegrationFlow容器必须被解析,所有bean都必须注册和启动。
如果无法提供带有转换器的SurveyResultMessageConverter,则可能需要将您的transform()逻辑重新考虑为普通的Jms.messageDrivenChannelAdapter。
那么在您的SurveyResultsSubscriptionDataFetcher中,您只需要拥有:
return surveyResultsReactiveSource.map(Message::getPayload);https://stackoverflow.com/questions/67184573
复制相似问题