我创建了一个新的示例,并将代码转移到客户端和服务器端。
完整的代码可以找到这里。
服务器端有3种版本。
ServerRSocketMessageHanlder创建ServerRSocketConnecter。@Controller和@MessageMapping。有两个版本的客户端。
RSocketRequester发送消息,根本不使用Spring。客户机和服务器的交互模式是REQUEST_CHANNEL,并通过TCP/localhost连接服务器:7000。
服务器
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
</dependency>申请课程:
@Configuration
@ComponentScan
@IntegrationComponentScan
@EnableIntegration
public class DemoApplication {
public static void main(String[] args) throws IOException {
try (ConfigurableApplicationContext ctx = new AnnotationConfigApplicationContext(DemoApplication.class)) {
System.out.println("Press any key to exit.");
System.in.read();
} finally {
System.out.println("Exited.");
}
}
@Bean
public ServerRSocketConnector serverRSocketConnector() {
return new ServerRSocketConnector("localhost", 7000);
}
@Bean
public IntegrationFlow rsocketUpperCaseFlow(ServerRSocketConnector serverRSocketConnector) {
return IntegrationFlows
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel)
.rsocketConnector(serverRSocketConnector)
)
.<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
.get();
}
}服务器引导
pom.xml中的依赖项。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
</dependency>application.properties
spring.rsocket.server.port=7000
spring.rsocket.server.transport=tcp申请课。
@SpringBootApplication
@EnableIntegration
public class DemoApplication {
public static void main(String[] args) throws IOException {
SpringApplication.run(DemoApplication.class, args);
}
// see PR: https://github.com/spring-projects/spring-boot/pull/18834
@Bean
ServerRSocketMessageHandler serverRSocketMessageHandler(RSocketStrategies rSocketStrategies) {
var handler = new ServerRSocketMessageHandler(true);
handler.setRSocketStrategies(rSocketStrategies);
return handler;
}
@Bean
public ServerRSocketConnector serverRSocketConnector(ServerRSocketMessageHandler serverRSocketMessageHandler) {
return new ServerRSocketConnector(serverRSocketMessageHandler);
}
@Bean
public IntegrationFlow rsocketUpperCaseFlow(ServerRSocketConnector serverRSocketConnector) {
return IntegrationFlows
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel)
.rsocketConnector(serverRSocketConnector)
)
.<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
.get();
}
}服务器-引导-消息映射
pom.xml中的依赖项。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>application.properties。
spring.rsocket.server.port=7000
spring.rsocket.server.transport=tcp掌声课。
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
@Controller
class UpperCaseHandler {
@MessageMapping("/uppercase")
public Flux<String> uppercase(Flux<String> input) {
return input.map(String::toUpperCase);
}
}客户端
在客户机中,pom.xml中的依赖项类似于。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
</dependency>申请课程:
@SpringBootApplication
@EnableIntegration
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public ClientRSocketConnector clientRSocketConnector() {
ClientRSocketConnector clientRSocketConnector = new ClientRSocketConnector("localhost", 7000);
clientRSocketConnector.setAutoStartup(false);
return clientRSocketConnector;
}
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlows
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.interactionModel((message) -> RSocketInteractionModel.requestChannel)
.expectedResponseType("T(java.lang.String)")
.clientRSocketConnector(clientRSocketConnector))
.get();
}
}
@RestController
class HelloController {
@Autowired()
@Lazy
@Qualifier("rsocketUpperCaseRequestFlow.gateway")
private Function<Flux<String>, Flux<String>> rsocketUpperCaseFlowFunction;
@GetMapping(value = "hello", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> uppercase() {
return rsocketUpperCaseFlowFunction.apply(Flux.just("a", "b", "c", "d"));
}
}在运行客户端和服务器应用程序时,尝试通过http://localhost:8080/hello访问curl。
当使用服务器和使用InboundGateway处理消息的服务器引导时,输出如下所示。
curl http://localhost:8080/hello
data:ABCD当使用server-boot-messagemapping,时,输出就像我预期的那样在工作:
data:A
data:B
data:C
data:D客户请求者
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>申请课程:
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
@RestController
class HelloController {
Mono<RSocketRequester> requesterMono;
public HelloController(RSocketRequester.Builder builder) {
this.requesterMono = builder.connectTcp("localhost", 7000);
}
@GetMapping(value = "hello", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> uppercase() {
return requesterMono.flatMapMany(
rSocketRequester -> rSocketRequester.route("/uppercase")
.data(Flux.just("a", "b", "c", "d"))
.retrieveFlux(String.class)
);
}
}在运行此客户端和3台服务器时,尝试通过http://localhost:8080/hello访问curl。
当使用服务器和使用InboundGateway处理消息的服务器引导时,它会引发类强制转换异常。
当使用server-boot-messagemapping,时,输出就像我预期的那样在工作:
data:A
data:B
data:C
data:D我不知道InboundGateway和OutboundGateway的配置问题在哪里?
发布于 2020-03-02 17:52:10
谢谢您提供如此详细的样品!
所以我所看到的。两个客户端(普通RSocketRequester和Spring )都能很好地使用普通RSocket服务器。
要使它们与一起工作,您必须执行以下更改:
将.requestElementType(ResolvableType.forClass(String.class))添加到RSockets.inboundGateway()定义中,这样它将知道如何转换传入的有效负载。
.data(Flux.just("a\n", "b\n", "c\n", "d\n"))。目前,Spring的服务器端没有将传入的Flux视为独立的有效负载流。所以,我们试着把它们连接到一个单一的值中。新的行分隔符是我们期望独立值的指示符。它端的Spring消息正好相反:它检查multi-value期望的类型,并解码其map()中传入的Flux中的每个元素,而不是对整个Publisher解码进行尝试。
这将是一种破坏性的更改,但可能需要考虑修复RSocketInboundGateway逻辑,使其与用于RSocket支持的常规@MessageMapping保持一致。请随意提出GH问题!
https://stackoverflow.com/questions/60464223
复制相似问题