首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用租约自定义ServerRSocketFactory

如何使用租约自定义ServerRSocketFactory
EN

Stack Overflow用户
提问于 2019-10-08 11:09:12
回答 1查看 563关注 0票数 1

我使用Spring5.2、SpringBoot2.2.0M6和SpringCloudHoxton.M2编写了一个简单的RSocket服务器和客户机。

我试图通过添加根据官方的RSocket来处理租约来定制ServerRSocketFactory服务器的RSocket样品

我正在使用ServerRSocketFactoryCustomizer添加租约处理。但是,当我声明自定义程序bean并启动服务器时,我会收到Spring的FunctionConfiguration (版本3.0.0.M3)的异常,上面写着“在BeanFactory中找到了多个函数”。

代码语言:javascript
复制
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'standAloneSupplierFlow' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.integration.dsl.IntegrationFlow]: Factory method 'standAloneSupplierFlow' threw exception; nested exception is java.lang.IllegalArgumentException: Found more then one function in BeanFactory
    at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:645) ~[spring-beans-5.2.0.RC2.jar:5.2.0.RC2]
    at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:625) ~[spring-beans-5.2.0.RC2.jar:5.2.0.RC2]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1339) ~[spring-beans-5.2.0.RC2.jar:5.2.0.RC2]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1178) ~[spring-beans-5.2.0.RC2.jar:5.2.0.RC2]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:557) ~[spring-beans-5.2.0.RC2.jar:5.2.0.RC2]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:517) ~[spring-beans-5.2.0.RC2.jar:5.2.0.RC2]
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:323) ~[spring-beans-5.2.0.RC2.jar:5.2.0.RC2]
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222) ~[spring-beans-5.2.0.RC2.jar:5.2.0.RC2]
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:321) ~[spring-beans-5.2.0.RC2.jar:5.2.0.RC2]
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202) ~[spring-beans-5.2.0.RC2.jar:5.2.0.RC2]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:878) ~[spring-beans-5.2.0.RC2.jar:5.2.0.RC2]
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:877) ~[spring-context-5.2.0.RC2.jar:5.2.0.RC2]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549) ~[spring-context-5.2.0.RC2.jar:5.2.0.RC2]
    at org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext.refresh(ReactiveWebServerApplicationContext.java:66) ~[spring-boot-2.2.0.M6.jar:2.2.0.M6]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747) ~[spring-boot-2.2.0.M6.jar:2.2.0.M6]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.2.0.M6.jar:2.2.0.M6]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.2.0.M6.jar:2.2.0.M6]
    at com.equalities.cloud.rsocket.server.RsocketServerApplication.main(RsocketServerApplication.java:19) ~[classes/:na]
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.integration.dsl.IntegrationFlow]: Factory method 'standAloneSupplierFlow' threw exception; nested exception is java.lang.IllegalArgumentException: Found more then one function in BeanFactory
    at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185) ~[spring-beans-5.2.0.RC2.jar:5.2.0.RC2]
    at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:640) ~[spring-beans-5.2.0.RC2.jar:5.2.0.RC2]
    ... 17 common frames omitted
Caused by: java.lang.IllegalArgumentException: Found more then one function in BeanFactory
    at org.springframework.util.Assert.isTrue(Assert.java:118) ~[spring-core-5.2.0.RC2.jar:5.2.0.RC2]
    at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry.discoverDefaultDefinitionIfNecessary(BeanFactoryAwareFunctionRegistry.java:194) ~[spring-cloud-function-context-3.0.0.M2.jar:3.0.0.M2]
    at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry.compose(BeanFactoryAwareFunctionRegistry.java:212) ~[spring-cloud-function-context-3.0.0.M2.jar:3.0.0.M2]
    at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry.lookup(BeanFactoryAwareFunctionRegistry.java:104) ~[spring-cloud-function-context-3.0.0.M2.jar:3.0.0.M2]
    at org.springframework.cloud.function.context.FunctionCatalog.lookup(FunctionCatalog.java:72) ~[spring-cloud-function-context-3.0.0.M2.jar:3.0.0.M2]
    at org.springframework.cloud.stream.function.FunctionConfiguration.standAloneSupplierFlow(FunctionConfiguration.java:90) ~[spring-cloud-stream-3.0.0.M3.jar:3.0.0.M3]
    at org.springframework.cloud.stream.function.FunctionConfiguration$$EnhancerBySpringCGLIB$$f29b466d.CGLIB$standAloneSupplierFlow$1(<generated>) ~[spring-cloud-stream-3.0.0.M3.jar:3.0.0.M3]
    at org.springframework.cloud.stream.function.FunctionConfiguration$$EnhancerBySpringCGLIB$$f29b466d$$FastClassBySpringCGLIB$$d3c910a8.invoke(<generated>) ~[spring-cloud-stream-3.0.0.M3.jar:3.0.0.M3]
    at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244) ~[spring-core-5.2.0.RC2.jar:5.2.0.RC2]
    at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:363) ~[spring-context-5.2.0.RC2.jar:5.2.0.RC2]
    at org.springframework.cloud.stream.function.FunctionConfiguration$$EnhancerBySpringCGLIB$$f29b466d.standAloneSupplierFlow(<generated>) ~[spring-cloud-stream-3.0.0.M3.jar:3.0.0.M3]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
    at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154) ~[spring-beans-5.2.0.RC2.jar:5.2.0.RC2]
    ... 18 common frames omitted

我使用以下代码声明自定义程序:

代码语言:javascript
复制
@Bean
public ServerRSocketFactoryCustomizer leaseCustomizer() {
  // Here, we return a ServerRSocketFactoryCustomizer bean to influence 
  // how the RSocket server is configured.
  //
  // A ServerRSocketFactory is defined by rsocket-java as an API that 
  // is used to create a server-side RSocket, using RSocketFactory.receive(). 
  // Among other things, it is used to configure leases to clients as shown in this sample:
  // https://github.com/rsocket/rsocket-java/blob/master/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/lease/LeaseExample.java
  // 
  // On the client side, a similar class, ClientRSocketFactory, exists.
  // This can be customized using the RSocketRequester.Builder's .rsocketFactory() method.
  // See: https://docs.spring.io/spring/docs/5.2.0.RELEASE/spring-framework-reference/web-reactive.html#rsocket-requester-client-advanced

  // See: org.springframework.boot.autoconfigure.rsocket.RSocketServerAutoConfiguration
  return new LeaseCustomizer();
}

其中LeaseCustomizer看起来如下:

代码语言:javascript
复制
public class LeaseCustomizer implements ServerRSocketFactoryCustomizer {

  @Override
  public ServerRSocketFactory apply(ServerRSocketFactory factory) {
    factory.lease(() -> Leases.<NoopStats>create()
                              .sender(new LeaseSender("Server", 7_000, 5))
                              .receiver(new LeaseReceiver("Server")));

    return factory;
  }

  private static class NoopStats implements LeaseStats {
    @Override
    public void onEvent(EventType eventType) {}
  }

  @Slf4j
  private static class LeaseSender implements Function<Optional<NoopStats>, Flux<Lease>> {
    private final String tag;
    private final int ttlMillis;
    private final int allowedRequests;

    public LeaseSender(String tag, int ttlMillis, int allowedRequests) {
      this.tag = tag;
      this.ttlMillis = ttlMillis;
      this.allowedRequests = allowedRequests;
    }

    @Override
    public Flux<Lease> apply(Optional<NoopStats> leaseStats) {
      log.info("{} stats are {}", tag, leaseStats.isPresent() ? "present" : "absent");
      return Flux.interval(ofSeconds(1), ofSeconds(10))
                 .onBackpressureLatest()
                 .map( tick -> {
                    log.info("{} responder sends new leases: ttl: {}, requests: {}", tag, ttlMillis, allowedRequests);
                    return Lease.create(ttlMillis, allowedRequests);
                 });
    }
  }

  @Slf4j
  private static class LeaseReceiver implements Consumer<Flux<Lease>> {
    private final String tag;

    public LeaseReceiver(String tag) {
      this.tag = tag;
    }

    @Override
    public void accept(Flux<Lease> receivedLeases) {
      receivedLeases.subscribe(lease -> log.info("{} received leases - ttl: {}, requests: {}", tag, lease.getTimeToLiveMillis(), lease.getAllowedRequests()));
    }
  }
}

我的bootstrap.yml看起来如下:

代码语言:javascript
复制
debug: true

server:
  port: ${PORT:3333}

spring:
  application:
    name: rsocket-server

  cloud:
    config:
      discovery:
        enabled: true
        service-id: config-server # should come from environment

  rsocket:
    server:
      port: 9999
      transport: tcp

eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka

我的pom.xml看起来如下:

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.0.M6</version>
    <relativePath /> <!-- lookup parent from repository -->
  </parent>
  <groupId>com.equalities.cloud</groupId>
  <artifactId>rsocket-server</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>rsocket-server</name>
  <description>An RSocket server application</description>

  <properties>
    <java.version>11</java.version>
    <spring-cloud.version>Hoxton.M2</spring-cloud.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-rsocket</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-bus</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-config</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-zipkin</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <scope>provided</scope> <!-- See: https://projectlombok.org/setup/maven -->
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
      <exclusions>
        <exclusion>
          <groupId>org.junit.vintage</groupId>
          <artifactId>junit-vintage-engine</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>io.projectreactor</groupId>
      <artifactId>reactor-test</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.amqp</groupId>
      <artifactId>spring-rabbit-test</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-dependencies</artifactId>
        <version>${spring-cloud.version}</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>

  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>

  <repositories>
    <repository>
      <id>spring-milestones</id>
      <name>Spring Milestones</name>
      <url>https://repo.spring.io/milestone</url>
    </repository>
  </repositories>
  <pluginRepositories>
    <pluginRepository>
      <id>spring-milestones</id>
      <name>Spring Milestones</name>
      <url>https://repo.spring.io/milestone</url>
    </pluginRepository>
  </pluginRepositories>

</project>

如您所见,我将Spring-Cloud-Config (服务器)与Spring–Boot-Starter-AMQPSpring–Cloud-Stream-Binder-RabbitMQ结合使用。

我已经调试了这个问题的来源,在我看来,这要么是Spring支持的问题,要么是Spring-Cloud-Stream的问题。

问题是,ServerRSocketFactoryCustomizer是一个@FunctionalInterface,即行为类似于一个函数,并由org.springframework.cloud.stream.function.FunctionConfiguration类获取,该类内部调用functionCatalog.lookup(functionProperties.getDefinition()),该类试图从注册表中执行的查找--这正是一个函数。由于我声明了注册表包含ServerRSocketFactoryCustomizer两个函数,因此抛出了异常。

通常,我希望我可以声明尽可能多的ServerRSocketFactoryCustomizer bean,并相应地对它们进行@Order以影响RSocket服务器的行为方式。今天,这似乎是不可能的,而且基于“”的对RSocket的Spring支持也隐藏了RSocket服务器套接字,这有点遗憾。

有没有任何方式可以自定义Spring/Spring提供的RSocket服务器来添加租约等等,就像我试图做的那样?

谢谢!

EN

回答 1

Stack Overflow用户

发布于 2019-10-08 12:30:04

因此,这是一个问题,在某种程度上,春云功能的一个特性是get。我们正在讨论它,并将很快修复它,但是现在这里有一个快速的解决方法。只需添加一个属性--spring.cloud.function.definition=blah,其中blah是不存在的,您应该没事。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58285184

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档