我使用RXJava实现了一个简单的数据分析功能,其中主题订阅者异步处理发布到主题的数据,将输出保存到Redis。
当接收到消息时,Spring组件将其发布到可观察的。为了避免阻塞提交,我使用了RxJava异步来异步完成此操作。
@Override
public void onMessage(final TransactionalMessage message) {
Async.start(new Func0<Void>() {
@Override
public Void call() {
analyser.process(message);
return null;
}
});
}在实现其他处理部分时,我有两个困惑:( 1)通过缓冲创建一个可观察的异步的,2)根据消息列表上的消息类型计算不同逻辑的并行(并行)。
经过长时间的实验,我找到了两种方法来创建异步的、可观察的并且不确定哪一种是正确的和更好的方法。
第一条路
private static final class Analyzer {
private Subscriber<? super TransactionalMessage> subscriber;
public Analyzer() {
OnSubscribe<TransactionalMessage> f = subscriber -> this.subscriber = subscriber;
Observable.create(f).observeOn(Schedulers.computation())
.buffer(5, TimeUnit.SECONDS, 5, Schedulers.io())
.skipWhile((list) -> list == null || list.isEmpty())
.subscribe(t -> compute(t));
}
public void process(TransactionalMessage message) {
subscriber.onNext(message);
}}
路二
private static final class Analyser {
private PublishSubject<TransactionalMessage> subject;
public Analyser() {
subject = PublishSubject.create();
Observable<List<TransactionalMessage>> observable = subject
.buffer(5, TimeUnit.SECONDS, 5, Schedulers.io())
.observeOn(Schedulers.computation());
observable.subscribe(new Observer<List<TransactionalMessage>>() {
@Override
public void onCompleted() {
log.debug("[Analyser] onCompleted(), completed!");
}
@Override
public void onError(Throwable e) {
log.error("[Analyser] onError(), exception, ", e);
}
@Override
public void onNext(List<TransactionalMessage> t) {
compute(t);
}
});
}
public void process(TransactionalMessage message) {
subject.onNext(message);
}
}TransactionalMessage有不同的类型,因此我想根据这些类型执行不同的计算。我尝试过的一种方法是根据每种类型过滤列表,并分别处理它们,但这看起来很糟糕,我认为不能并行工作。如何并行处理它们?
protected void compute(List<TransactionalMessage> messages) {
Observable<TransactionalMessage> observable = Observable
.from(messages);
Observable<String> observable2 = observable
.filter(new Func1<TransactionalMessage, Boolean>() {
@Override
public Boolean call(TransactionalMessage t) {
return t.getMsgType()
.equals(OttMessageType.click.name());
}
}).flatMap(
new Func1<TransactionalMessage, Observable<String>>() {
@Override
public Observable<String> call(
TransactionalMessage t) {
return Observable.just(
t.getMsgType() + t.getAppId());
}
});
Observable<String> observable3 = observable
.filter(new Func1<TransactionalMessage, Boolean>() {
@Override
public Boolean call(TransactionalMessage t) {
return t.getMsgType()
.equals(OttMessageType.image.name());
}
}).flatMap(
new Func1<TransactionalMessage, Observable<String>>() {
@Override
public Observable<String> call(
TransactionalMessage t) {
return Observable.just(
t.getMsgType() + t.getAppId());
}
});
// I sense some code smell in filtering on type and processing it.
Observable.merge(observable2, observable3)
.subscribe(new Action1<String>() {
@Override
public void call(String t) {
// save it to redis
System.out.println(t);
}
});
}发布于 2015-10-14 21:33:58
我建议在尝试使用Subject之前考虑create。
如果您希望基于某种分类完成并行处理,可以使用groupBy和observeOn来实现所需的效果:
Observable.range(1, 100)
.groupBy(v -> v % 3)
.flatMap(g ->
g.observeOn(Schedulers.computation())
.reduce(0, (a, b) -> a + b)
.map(v -> g.getKey() + ": " + v)
)
.toBlocking().forEach(System.out::println);https://stackoverflow.com/questions/31837780
复制相似问题