首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spring集成RSocket和Spring RSocket交互问题

Spring集成RSocket和Spring RSocket交互问题
EN

Stack Overflow用户
提问于 2020-02-29 10:42:31
回答 1查看 396关注 0票数 1

我创建了一个新的示例,并将代码转移到客户端和服务器端。

完整的代码可以找到这里

服务器端有3种版本。

  • 服务器无Spring应用程序,使用Spring InboundGateway。
  • 服务器-引导重用Spring RSocket自动配置,并通过ServerRSocketMessageHanlder创建ServerRSocketConnecter
  • server-boot-messsagemapping不使用Spring,只使用Spring自动配置,以及@Controller@MessageMapping

有两个版本的客户端。

  • 客户端,使用发送消息。
  • client-requester使用RSocketRequester发送消息,根本不使用Spring。

客户机和服务器的交互模式是REQUEST_CHANNEL,并通过TCP/localhost连接服务器:7000。

服务器

代码语言:javascript
复制
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-rsocket</artifactId>
</dependency>

申请课程:

代码语言:javascript
复制
@Configuration
@ComponentScan
@IntegrationComponentScan
@EnableIntegration
public class DemoApplication {

    public static void main(String[] args) throws IOException {
        try (ConfigurableApplicationContext ctx = new AnnotationConfigApplicationContext(DemoApplication.class)) {
            System.out.println("Press any key to exit.");
            System.in.read();
        } finally {
            System.out.println("Exited.");
        }

    }

    @Bean
    public ServerRSocketConnector serverRSocketConnector() {
        return new ServerRSocketConnector("localhost", 7000);
    }

    @Bean
    public IntegrationFlow rsocketUpperCaseFlow(ServerRSocketConnector serverRSocketConnector) {
        return IntegrationFlows
                .from(RSockets.inboundGateway("/uppercase")
                        .interactionModels(RSocketInteractionModel.requestChannel)
                        .rsocketConnector(serverRSocketConnector)
                )
                .<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
                .get();
    }

}

服务器引导

pom.xml中的依赖项。

代码语言:javascript
复制
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-rsocket</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-rsocket</artifactId>
        </dependency>

application.properties

代码语言:javascript
复制
spring.rsocket.server.port=7000
spring.rsocket.server.transport=tcp

申请课。

代码语言:javascript
复制
@SpringBootApplication
@EnableIntegration
public class DemoApplication {

    public static void main(String[] args) throws IOException {
        SpringApplication.run(DemoApplication.class, args);
    }

    // see PR: https://github.com/spring-projects/spring-boot/pull/18834
    @Bean
    ServerRSocketMessageHandler serverRSocketMessageHandler(RSocketStrategies rSocketStrategies) {
        var handler = new ServerRSocketMessageHandler(true);
        handler.setRSocketStrategies(rSocketStrategies);
        return handler;
    }

    @Bean
    public ServerRSocketConnector serverRSocketConnector(ServerRSocketMessageHandler serverRSocketMessageHandler) {
        return new ServerRSocketConnector(serverRSocketMessageHandler);
    }

    @Bean
    public IntegrationFlow rsocketUpperCaseFlow(ServerRSocketConnector serverRSocketConnector) {
        return IntegrationFlows
                .from(RSockets.inboundGateway("/uppercase")
                        .interactionModels(RSocketInteractionModel.requestChannel)
                        .rsocketConnector(serverRSocketConnector)
                )
                .<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
                .get();
    }

}

服务器-引导-消息映射

pom.xml中的依赖项。

代码语言:javascript
复制
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-rsocket</artifactId>
        </dependency>

application.properties。

代码语言:javascript
复制
spring.rsocket.server.port=7000
spring.rsocket.server.transport=tcp

掌声课。

代码语言:javascript
复制
@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

@Controller
class UpperCaseHandler {

    @MessageMapping("/uppercase")
    public Flux<String> uppercase(Flux<String> input) {
        return input.map(String::toUpperCase);
    }
}

客户端

在客户机中,pom.xml中的依赖项类似于。

代码语言:javascript
复制
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-rsocket</artifactId>
</dependency>

申请课程:

代码语言:javascript
复制
@SpringBootApplication
@EnableIntegration
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    public ClientRSocketConnector clientRSocketConnector() {
        ClientRSocketConnector clientRSocketConnector = new ClientRSocketConnector("localhost", 7000);
        clientRSocketConnector.setAutoStartup(false);
        return clientRSocketConnector;
    }

    @Bean
    public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
        return IntegrationFlows
                .from(Function.class)
                .handle(RSockets.outboundGateway("/uppercase")
                        .interactionModel((message) -> RSocketInteractionModel.requestChannel)
                        .expectedResponseType("T(java.lang.String)")
                        .clientRSocketConnector(clientRSocketConnector))
                .get();
    }
}

@RestController
class HelloController {

    @Autowired()
    @Lazy
    @Qualifier("rsocketUpperCaseRequestFlow.gateway")
    private Function<Flux<String>, Flux<String>> rsocketUpperCaseFlowFunction;

    @GetMapping(value = "hello", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> uppercase() {
        return rsocketUpperCaseFlowFunction.apply(Flux.just("a", "b", "c", "d"));
    }
}

在运行客户端和服务器应用程序时,尝试通过http://localhost:8080/hello访问curl

当使用服务器和使用InboundGateway处理消息的服务器引导时,输出如下所示。

代码语言:javascript
复制
curl http://localhost:8080/hello

data:ABCD

当使用server-boot-messagemapping,时,输出就像我预期的那样在工作:

代码语言:javascript
复制
data:A
data:B
data:C
data:D

客户请求者

代码语言:javascript
复制
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-rsocket</artifactId>
        </dependency>

申请课程:

代码语言:javascript
复制
@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

@RestController
class HelloController {
    Mono<RSocketRequester> requesterMono;

    public HelloController(RSocketRequester.Builder builder) {
        this.requesterMono = builder.connectTcp("localhost", 7000);
    }

    @GetMapping(value = "hello", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> uppercase() {
        return requesterMono.flatMapMany(
                rSocketRequester -> rSocketRequester.route("/uppercase")
                        .data(Flux.just("a", "b", "c", "d"))
                        .retrieveFlux(String.class)
        );
    }
}

在运行此客户端和3台服务器时,尝试通过http://localhost:8080/hello访问curl

当使用服务器和使用InboundGateway处理消息的服务器引导时,它会引发类强制转换异常。

当使用server-boot-messagemapping,时,输出就像我预期的那样在工作:

代码语言:javascript
复制
data:A
data:B
data:C
data:D

我不知道InboundGateway和OutboundGateway的配置问题在哪里?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-03-02 17:52:10

谢谢您提供如此详细的样品!

所以我所看到的。两个客户端(普通RSocketRequester和Spring )都能很好地使用普通RSocket服务器。

要使它们与一起工作,您必须执行以下更改:

  1. 服务器端:

.requestElementType(ResolvableType.forClass(String.class))添加到RSockets.inboundGateway()定义中,这样它将知道如何转换传入的有效负载。

  1. 客户端: .data(Flux.just("a\n", "b\n", "c\n", "d\n"))

目前,Spring的服务器端没有将传入的Flux视为独立的有效负载流。所以,我们试着把它们连接到一个单一的值中。新的行分隔符是我们期望独立值的指示符。它端的Spring消息正好相反:它检查multi-value期望的类型,并解码其map()中传入的Flux中的每个元素,而不是对整个Publisher解码进行尝试。

这将是一种破坏性的更改,但可能需要考虑修复RSocketInboundGateway逻辑,使其与用于RSocket支持的常规@MessageMapping保持一致。请随意提出GH问题!

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60464223

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档