首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >RxJava-同步流处理

RxJava-同步流处理
EN

Stack Overflow用户
提问于 2015-08-05 16:16:54
回答 1查看 433关注 0票数 1

我使用RXJava实现了一个简单的数据分析功能,其中主题订阅者异步处理发布到主题的数据,将输出保存到Redis。

当接收到消息时,Spring组件将其发布到可观察的。为了避免阻塞提交,我使用了RxJava异步来异步完成此操作。

代码语言:javascript
复制
@Override
public void onMessage(final TransactionalMessage message) {
    Async.start(new Func0<Void>() {
        @Override
        public Void call() {
            analyser.process(message);
            return null;
        }
    });
}

在实现其他处理部分时,我有两个困惑:( 1)通过缓冲创建一个可观察的异步的,2)根据消息列表上的消息类型计算不同逻辑的并行(并行)。

经过长时间的实验,我找到了两种方法来创建异步的、可观察的并且不确定哪一种是正确的和更好的方法。

第一条路

代码语言:javascript
复制
private static final class Analyzer {

private Subscriber<? super TransactionalMessage> subscriber;

public Analyzer() {
    OnSubscribe<TransactionalMessage> f = subscriber -> this.subscriber = subscriber;
    Observable.create(f).observeOn(Schedulers.computation())
            .buffer(5, TimeUnit.SECONDS, 5, Schedulers.io())
            .skipWhile((list) -> list == null || list.isEmpty())
            .subscribe(t -> compute(t));
}
public void process(TransactionalMessage message) {
        subscriber.onNext(message);
    }

}

路二

代码语言:javascript
复制
private static final class Analyser {

private PublishSubject<TransactionalMessage> subject;

public Analyser() {
    subject = PublishSubject.create();
    Observable<List<TransactionalMessage>> observable = subject
            .buffer(5, TimeUnit.SECONDS, 5, Schedulers.io())
            .observeOn(Schedulers.computation());
    observable.subscribe(new Observer<List<TransactionalMessage>>() {

    @Override
    public void onCompleted() {
        log.debug("[Analyser] onCompleted(), completed!");
    }

    @Override
    public void onError(Throwable e) {
        log.error("[Analyser] onError(), exception, ", e);
    }

    @Override
    public void onNext(List<TransactionalMessage> t) {
        compute(t);
    }
    });
}

public void process(TransactionalMessage message) {
    subject.onNext(message);
}
}

TransactionalMessage有不同的类型,因此我想根据这些类型执行不同的计算。我尝试过的一种方法是根据每种类型过滤列表,并分别处理它们,但这看起来很糟糕,我认为不能并行工作。如何并行处理它们?

代码语言:javascript
复制
protected void compute(List<TransactionalMessage> messages) {
        Observable<TransactionalMessage> observable = Observable
                .from(messages);
        Observable<String> observable2 = observable
                .filter(new Func1<TransactionalMessage, Boolean>() {

                    @Override
                    public Boolean call(TransactionalMessage t) {
                        return t.getMsgType()
                                .equals(OttMessageType.click.name());
                    }
                }).flatMap(
                        new Func1<TransactionalMessage, Observable<String>>() {

                            @Override
                            public Observable<String> call(
                                    TransactionalMessage t) {
                                return Observable.just(
                                        t.getMsgType() + t.getAppId());
                            }
                        });

        Observable<String> observable3 = observable
                .filter(new Func1<TransactionalMessage, Boolean>() {

                    @Override
                    public Boolean call(TransactionalMessage t) {
                        return t.getMsgType()
                                .equals(OttMessageType.image.name());
                    }
                }).flatMap(
                        new Func1<TransactionalMessage, Observable<String>>() {

                            @Override
                            public Observable<String> call(
                                    TransactionalMessage t) {
                                return Observable.just(
                                        t.getMsgType() + t.getAppId());
                            }
                        });

        // I sense some code smell in filtering on type and processing it.

        Observable.merge(observable2, observable3)
                .subscribe(new Action1<String>() {

                    @Override
                    public void call(String t) {
                        // save it to redis
                        System.out.println(t);
                    }
                });
    }
EN

回答 1

Stack Overflow用户

发布于 2015-10-14 21:33:58

我建议在尝试使用Subject之前考虑create

如果您希望基于某种分类完成并行处理,可以使用groupByobserveOn来实现所需的效果:

代码语言:javascript
复制
Observable.range(1, 100)
.groupBy(v -> v % 3)
.flatMap(g -> 
     g.observeOn(Schedulers.computation())
     .reduce(0, (a, b) -> a + b)
     .map(v -> g.getKey() + ": " + v)
)
.toBlocking().forEach(System.out::println);
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/31837780

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档