我有以下代码:
/**
* Request wrapped around flowable.
*/
public abstract class RequestFlowable<T> {
private final PublishProcessor<String> mPublish;
private String mName;
public RequestFlowable(String name) {
mName = name;
mPublish = PublishProcessor.create();
}
public Flowable<T> getFlowable() {
//return createAction();
return mPublish.compose(new FlowableTransformer<String, T>() {
@Override
public Publisher<T> apply(@NonNull Flowable<String> upstream) {
return createAction();
}
});
/*
return mPublish.flatMap(new Function<String, Publisher<? extends T>>() {
@Override
public Publisher<? extends T> apply(@NonNull String s) throws Exception {
return createAction();
}
});
*/
}
protected abstract Flowable<T> createAction();
public String getName() {
return mName;
}
public void start() {
mPublish.onNext("processCommand");
}
@Override
public String toString() {
return "Request: " + mName;
}
}现在是单#编辑2
public abstract class Request<T> {
private final SingleSubject<Object> mPublish;
private String mName;
public Request(String name) {
mName = name;
mPublish = SingleSubject.create();
}
public Single<T> getSingle() {
return mPublish.flatMap(o -> createAction());
}
protected abstract Single<? extends T> createAction();
public String getName() {
return mName;
}
public void start() {
mPublish.onSuccess("Start");
}
@Override
public String toString() {
return "Request: " + mName;
}
}上面的代码与compose一起使用时起作用,就像上面代码中的代码一样,但是,如果我将注释的代码--也就是flatMap --因为某种原因而不执行createAction的话。
编辑2
上述代码是从另一个类调用的。相应的代码附在下面(增加了类的重要部分):
public class RequestQueue implements RequestController {
private static final String TAG = RequestQueue.class.getSimpleName();
private PublishSubject<Request> mRequest;
private PublishSubject<RequestFlowable> mRequestFlowable;
@Override
public <T> Single<T> registerRequest(Request<T> request) {
mRequest.onNext(request);
return request.getSingle();
}
@Override
public <T> Flowable<T> registerRequestFlowable(RequestFlowable<T> request) {
mRequestFlowable.onNext(request);
return request.getFlowable();
}
public RequestQueue() {
mRequest = PublishSubject.create();
mRequestFlowable = PublishSubject.create();
mRequest.subscribe(this::actionOnRequest);
mRequestFlowable.subscribe(this::actionOnRequest);
}
private void actionOnRequest(Request request) {
Log.d(TAG, "actionOnRequest() called with: request = [" + request + "]");
request.start();
}
private void actionOnRequest(RequestFlowable request) {
Log.d(TAG, "actionOnRequest() called with: request = [" + request + "]");
request.start();
}
}发布于 2017-07-25 07:51:44
(根据我的意见:)
为什么
Single能工作?
SingleSubject保留它接收到的单个终端事件。因为它只能接收onSuccess和onError,所以它将“重播”给晚订阅者(这也是为什么没有分隔器ReplaySingleSubject)。当在onSuccess上调用SingleSubject时,将记住该值,并在发生后的订阅时重新发出promplty,调用您的createAction。PublishProcessor也会记住它的终端事件,但是onNext不是终端事件,因此在没有使用者的情况下被删除。
如何通过
Processor实现所需的行为?
您可以重新组织逻辑,使用BehaviorProcessor或ReplayProcessor.createWithSize(1)。调用onComplete也不会执行flatMap函数。
https://stackoverflow.com/questions/45280992
复制相似问题