首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >第二,通量从未开始做它的工作。

第二,通量从未开始做它的工作。
EN

Stack Overflow用户
提问于 2017-06-27 09:31:37
回答 1查看 319关注 0票数 0

我想测试弹簧反应堆,并为此实现了一个小例子,下面是相关代码:

应用程序:

代码语言:javascript
复制
@SpringBootApplication
public class Application implements CommandLineRunner {
    @Autowired
    private ServiceData data;

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        this.data.start();
    }
}

FluxCreatorFunction

代码语言:javascript
复制
public class FluxCreatorFunction<T extends Function<V, E>, V, E> {

    public ConnectableFlux<E> createFlux(T t, V v) {
        return (ConnectableFlux<E>) Flux.<E>create(flux -> {
            while (true) {
                try {
                    Thread.sleep((ThreadLocalRandom.current().nextInt(1, (3 + 1)) * 1000));

                    flux.next(t.apply(v));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).publish();
    }   
}

ServiceData

代码语言:javascript
复制
@Service
public class ServiceData {

    @Autowired
    @Qualifier("inserter")
    private ConnectableFlux<Todo> fluxInserter;

    @Autowired
    @Qualifier("deleter")
    private ConnectableFlux<Todo> fluxDeleter;

    @Autowired
    private TodoRepository repo;

    public void start() {
        this.startInserter();
        this.startDeleter();
    }

    public void startInserter() {
        this.fluxInserter.subscribe(new ConsumerInserter());
        this.fluxInserter.subscribe((todo) -> {
            this.repo.save(todo);
            this.repo.flush();
        });

        this.fluxInserter.connect();
    }

    public void startDeleter() {
        this.fluxDeleter.subscribe(new ConsumerDeleter());
        this.fluxDeleter.subscribe((todo) -> {
            this.repo.delete(todo);
            this.repo.flush();
        });

        this.fluxDeleter.connect();
    }

    @Bean
    @Qualifier("inserter")
    public ConnectableFlux<Todo> createInserter() {
        return new FluxCreatorFunction<FunctionInserter, Void, Todo>().createFlux(new FunctionInserter(), null);
    }

    @Bean
    @Qualifier("deleter")
    public ConnectableFlux<Todo> createDeleter() {
        return new FluxCreatorFunction<FunctionDeleter, TodoRepository, Todo>().createFlux(new FunctionDeleter(), this.repo);
    }
}

class FunctionInserter implements Function<Void, Todo> {

    private RestTemplate restTemplate = new RestTemplate();

    @Override
    public Todo apply(Void v) {
        String quote = this.restTemplate.getForObject("http://gturnquist-quoters.cfapps.io/api/random", QuoteResource.class).getValue().getQuote();

        return new Todo(quote, false);
    }
}

class FunctionDeleter implements Function<TodoRepository, Todo> {

    @Override
    public Todo apply(TodoRepository repo) {
        return repo.findAll().get(0);
    }
}

class ConsumerInserter implements Consumer<Todo> {

    @Override
    public void accept(Todo todo) {
        System.out.println("New todo: " + todo.getText());
    }
}

class ConsumerDeleter implements Consumer<Todo> {

    @Override
    public void accept(Todo todo) {
        System.out.println("Deleted todo: " + todo.getText());
    }
}

如您所见,我正在创建两个不同的Flux发布服务器。两者都是作为服务的@Bean@Autowired创建的。

问题是: Onlay,第一个Flux正在做它的工作。如果我先启动插入器:

代码语言:javascript
复制
this.startInserter();
this.startDeleter();

产出如下:

代码语言:javascript
复制
New todo: So easy it is to switch container in #springboot.
New todo: Spring has come quite a ways in addressing developer enjoyment and ease of use since the last time I built an application using it.
New todo: Working with Spring Boot is like pair-programming with the Spring developers.
New todo: Spring has come quite a ways in addressing developer enjoyment and ease of use since the last time I built an application using it.
New todo: The real benefit of Boot, however, is that it's just Spring. That means any direction the code takes, regardless of complexity, I know it's a safe bet.
New todo: I have two hours today to build an app from scratch. @springboot to the rescue!
New todo: Really loving Spring Boot, makes stand alone Spring apps easy.
New todo: Really loving Spring Boot, makes stand alone Spring apps easy.

如果我转过身来:

代码语言:javascript
复制
this.startDeleter();
this.startInserter();

产出如下:

代码语言:javascript
复制
Deleted todo: Spring has come quite a ways in addressing developer enjoyment and ease of use since the last time I built an application using it.
Deleted todo: Spring has come quite a ways in addressing developer enjoyment and ease of use since the last time I built an application using it.
Deleted todo: The real benefit of Boot, however, is that it's just Spring. That means any direction the code takes, regardless of complexity, I know it's a safe bet.
Deleted todo: Spring has come quite a ways in addressing developer enjoyment and ease of use since the last time I built an application using it.
Deleted todo: So easy it is to switch container in #springboot.
Deleted todo: I have two hours today to build an app from scratch. @springboot to the rescue!
Deleted todo: Really loving Spring Boot, makes stand alone Spring apps easy.
Deleted todo: So easy it is to switch container in #springboot.

所以,我先从哪个Flux开始并不重要。第二个Flux从来没有做过它的工作,我不知道为什么。两个Flux都运行在同一个线程上吗?他们需要一个标识符吗?还有什么问题吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-06-27 16:56:40

Flux.create中生成数据的方式必须是异步的。在这里,您正在阻塞,这是不支持的。是的,两个Flux最终都在同一个线程上执行,而第一个线程则无限期地循环并阻塞所述线程。

您可以使用"subscribeOn“和调度程序(如Schedulers.parallel()Schedulers.elastic() )将工作推迟到单独的线程上。

您还可以尝试避免使用create/generate,并通过使用时间运算符实现完全非阻塞。例如,最新的3.1.0.M2里程碑有delayUntil。您也可以使用类似于Flux.range(1, n).concatMap(index -> Mono.delay(generateRandomDelayValue(index))的东西

编辑:我可以想出一种复制您尝试的随机延迟行为的方法是:

代码语言:javascript
复制
public class FluxCreatorFunction<T extends Function<V, E>, V, E> {

  public ConnectableFlux<E> createFlux(T t, V v) {
    //use generate to generate a random value per cycle
    return Flux.generate(sink -> ThreadLocalRandom
                                 .current()
                                 .nextInt(1, (3 + 1)))
               //that random value will be used as a delay, so we need to
               //transform each value into a new async sequence, and also
               //ensure that the order is preserved, hence concatMap
               .concatMap(randomDelay ->
                          //we introduce a delay then...
                          Mono.delay(Duration.ofSeconds(randomDelay))
                              //... map to the result of the function
                              .map(ignore -> t.apply(v))
               .publish();
  }
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/44776815

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档