首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何创建对GraphQL主题的DGS ActiveMQ订阅

如何创建对GraphQL主题的DGS ActiveMQ订阅
EN

Stack Overflow用户
提问于 2021-04-20 18:41:12
回答 1查看 585关注 0票数 1

我有一个复杂的技术栈。我正在利用Netflix提供GraphQL服务。幕后有一堆JMS组件,它们从各种服务中发送和接收数据。除了GraphQL订阅之外,我拥有所有的功能。

具体来说,我要做的是为来自一个GraphQL主题的消息创建一个ActiveMQ订阅。

所以我有一个SubscriptionDataFetcher,如下所示:

代码语言:javascript
复制
@DgsComponent
public class SurveyResultsSubscriptionDataFetcher {

    private final Publisher<SurveyResult> surveyResultsReactiveSource;

    @Autowired
    public SurveyResultsSubscriptionDataFetcher(Publisher<SurveyResult> surveyResultsReactiveSource) {
        this.surveyResultsReactiveSource = surveyResultsReactiveSource;
    }

    @DgsData(parentType = DgsConstants.SUBSCRIPTION.TYPE_NAME, field = DgsConstants.SUBSCRIPTION.SurveyResultStream)
    public Publisher<SurveyResult> surveyResults() {
        return surveyResultsReactiveSource;
    }
}

在我的Spring配置中,我使用了以下Spring集成流:

代码语言:javascript
复制
@Bean
@Scope(BeanDefinition.SCOPE_PROTOTYPE)
public Publisher<SurveyResult> surveyResultsReactiveSource() {
    SurveyResultMessageConverter converter = new SurveyResultMessageConverter();

    return Flux.from(
            IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination(surveyDestination))
                    .log(LoggingHandler.Level.DEBUG)
                    .log()
                    .toReactivePublisher())
            .map((message) -> converter.fromMessage(message, SurveyResult.class));
}

我要说几件事:

  1. I有一个单独的@JmsListener来接收这些消息,主题是
  2. I do not --查看多个使用者,即使在web套接字连接为@JmsListener之后,也会将Mongo反应性Spring数据存储库连接到GraphQL订阅,数据由客户端接收。

当我将客户端连接到订阅时,我会看到以下日志:

代码语言:javascript
复制
PublishSubscribeChannel      : Channel 'unknown.channel.name' has 1 subscriber(s).
DgsWebSocketHandler          : Subscription started for 1

我怀疑在建立web套接字连接时消息侦听器容器没有被激活。我应该“激活”频道适配器吗?我遗漏了什么?

技术人员:

代码语言:javascript
复制
    // spring boots - version 2.4.3
    implementation "org.springframework.boot:spring-boot-starter-web"
    implementation "org.springframework.boot:spring-boot-starter-activemq"
    implementation "org.springframework.boot:spring-boot-starter-data-mongodb-reactive"
    implementation "org.springframework.boot:spring-boot-starter-integration"
    implementation 'org.springframework.boot:spring-boot-starter-security'

    // spring integration
    implementation group: 'org.springframework.integration', name: 'spring-integration-jms', version: '5.4.4'

    // dgs
    implementation "com.netflix.graphql.dgs:graphql-dgs-spring-boot-starter:3.10.2"
    implementation 'com.netflix.graphql.dgs:graphql-dgs-subscriptions-websockets-autoconfigure:3.10.2'

更新1:

值得注意的是,如果我将订阅更新为以下内容,则会在客户端获得结果。

代码语言:javascript
复制
    @DgsData(parentType = DgsConstants.SUBSCRIPTION.TYPE_NAME, field = DgsConstants.SUBSCRIPTION.SurveyResultStream)
    public Publisher<SurveyResult> surveyResults() {
        // repository is a ReactiveMongoRepository
        return repository.findAll();
    }

更新2:

这是最后确定的bean,以防它在接受的解决方案基础上帮助某人。我需要的是一个主题,而不是队列上的侦听器。

代码语言:javascript
复制
    @Bean
    public Publisher<Message<SurveyResult>> surveyResultsReactiveSource() {
        SurveyResultMessageConverter converter = new SurveyResultMessageConverter();

        return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(
                Jms.container(connectionFactory(), surveyDestination).pubSubDomain(true))
                .jmsMessageConverter(converter))
                .toReactivePublisher();
    }
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-04-20 20:55:01

你的问题是:

代码语言:javascript
复制
return Flux.from(
        IntegrationFlows.from(

框架没有看到内部的IntegrationFlow实例来正确地解析和注册。

要使它正常工作,您需要考虑将IntegrationFlow声明为顶级bean。

就像这样:

代码语言:javascript
复制
@Bean
public Publisher<Message<SurveyResult>> surveyResultsReactiveSource() {  
    return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination(surveyDestination))
                    .log(LoggingHandler.Level.DEBUG)
                    .transform([transform to SurveyResult])
                    .toReactivePublisher();
}

现在,框架知道这个逻辑IntegrationFlow容器必须被解析,所有bean都必须注册和启动。

如果无法提供带有转换器的SurveyResultMessageConverter,则可能需要将您的transform()逻辑重新考虑为普通的Jms.messageDrivenChannelAdapter

那么在您的SurveyResultsSubscriptionDataFetcher中,您只需要拥有:

代码语言:javascript
复制
 return surveyResultsReactiveSource.map(Message::getPayload);
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67184573

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档