首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >反应堆3.x中的EventBus

反应堆3.x中的EventBus
EN

Stack Overflow用户
提问于 2017-04-14 07:33:43
回答 1查看 5.2K关注 0票数 10

我知道Reactor3.x中不推荐EventBus,建议的解决方案是ReplayProcessor。我读过https://github.com/reactor/reactor-core/issues/375。但这里的代码太草稿了。我创建了一个演示项目来证明这里的想法。有人能给点意见吗?

======== Application.java

代码语言:javascript
复制
package hello;
import org.reactivestreams.Subscription;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

import reactor.core.publisher.Flux;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.publisher.BaseSubscriber;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

@Configuration
@EnableAutoConfiguration
@ComponentScan
public class Application implements CommandLineRunner {

    private static final int NUMBER_OF_QUOTES = 10;

    @Bean
    ReplayProcessor createReplayProcessor() {

        ReplayProcessor<MyEvent> rp = ReplayProcessor.create();

        Flux<MyEvent> interest1 = rp.filter(ev -> filterInterest1(ev));

        Flux<MyEvent> interest2 = rp.filter(ev -> filterInterest2(ev));

        interest1.subscribe(new BaseSubscriber<MyEvent>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                requestUnbounded();
            }
            @Override
            protected void hookOnNext(MyEvent value) {
                //todo: call service method
                System.out.println("event 1 handler -> event name:" + value.getEventName());
            }

        });


        interest2.subscribe(new BaseSubscriber<MyEvent>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                requestUnbounded();
            }
            @Override
            protected void hookOnNext(MyEvent value) {
                //todo: call service method
                System.out.println("event2 handler -> event name:" + value.getEventName());
            }
        });

        return rp;
    }

    public boolean filterInterest1(MyEvent myEvent) {
        if (myEvent != null && myEvent.getEventName() != null
                && myEvent.getEventName().equalsIgnoreCase("event1")) {
            return true;
        }
        return false;
    }

    public boolean filterInterest2(MyEvent myEvent) {
        if (myEvent != null && myEvent.getEventName() != null
                && myEvent.getEventName().equalsIgnoreCase("event2")) {
            return true;
        }
        return false;
    }


    @Autowired
    private Publisher publisher;

    @Bean
    public CountDownLatch latch() {
        return new CountDownLatch(NUMBER_OF_QUOTES);
    }

    @Override
    public void run(String... args) throws Exception {
        publisher.publishQuotes(NUMBER_OF_QUOTES);
    }

    public static void main(String[] args) throws InterruptedException {
        ApplicationContext app = SpringApplication.run(Application.class, args);

        app.getBean(CountDownLatch.class).await(10, TimeUnit.SECONDS);


    }

}

==========MyEvent.java=============

代码语言:javascript
复制
package hello;

public class MyEvent {

    private String eventName = "";

    public String getEventName() {
        return eventName;
    }

    public void setEventName(String eventName) {
        this.eventName = eventName;
    }

    public MyEvent(String eventName) {
        this.eventName =  eventName;
    }


    public void filterInterest1(MyEvent myEvent) {

    }
}

=============Publisher.java ===========

代码语言:javascript
复制
package hello;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import reactor.core.publisher.ReplayProcessor;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

@Service
public class Publisher {

    @Autowired
    ReplayProcessor rp;

    @Autowired
    CountDownLatch latch;

    public void publishQuotes(int numberOfQuotes) throws InterruptedException {
        long start = System.currentTimeMillis();

        rp.onNext(new MyEvent("event1"));
        rp.onNext(new MyEvent("event2"));
        rp.onNext(new MyEvent("event3"));

        long elapsed = System.currentTimeMillis() - start;

        System.out.println("Elapsed time: " + elapsed + "ms");
        System.out.println("Average time per quote: " + elapsed / numberOfQuotes + "ms");
    }

}

整个代码是https://github.com/yigubigu/reactor-start-sample.git

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-11-14 05:22:28

您可以在Spring事件处理程序中中继。Matt和Josh Long在这几篇教程中用到了这一点:

主要外卖:

@Component class ProfileCreatedEventPublisher implements ApplicationListener<ProfileCreatedEvent>, Consumer<FluxSink<ProfileCreatedEvent>>

使用事件循环从LinkedBlockingQueue接收事件。

@Override public void onApplicationEvent(ProfileCreatedEvent event)

将可以在应用程序中任何地方发布的事件排队。

ProfileCreatedEventPublisherServerSentEventController中用于创建事件的Flux (可以与filter链接),它转换并发送给web客户端。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/43407227

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档