我有一个采用这种配置的PublishSubject:
PublishSubject<Message> messageObserver =
messageObserver
.filter(t -> test(t))
.buffer(eventsSaveTimeSpanInSeconds, TimeUnit.SECONDS, eventsSaveCount)
.subscribe(messages -> saveToDB(messages));我的应用程序的不同线程正在通过onNext()向这个PublishSubject写入消息。
正如我所看到的,ObservableBufferTimed.BufferExactBoundedObserver底层的buffer是非线程安全的,因为它的onNext看起来如下所示:
public void onNext(T t) {
U b;
synchronized (this) {
b = buffer;
if (b == null) {
return;
}
b.add(t);
if (b.size() < maxSize) {
return;
}
buffer = null;
producerIndex++;
}
if (restartTimerOnMaxSize) {
timer.dispose();
}
fastPathOrderedEmit(b, false, this);
try {
b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The buffer supplied is null");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
actual.onError(e);
dispose();
return;
}
synchronized (this) {
buffer = b;
consumerIndex++;
}
if (restartTimerOnMaxSize) {
timer = w.schedulePeriodically(this, timespan, timespan, unit);
}
}为了使竞争条件更加明显,我将eventsSaveTimeSpanInSeconds和eventsSaveCount参数设置为1 (1秒内发生1个事件)。
问题出现在此块中:
synchronized (this) {
b = buffer;
if (b == null) {
return;
}
b.add(t);
if (b.size() < maxSize) {
return;
}
buffer = null;
producerIndex++;
}因此,如果两条消息同时缓冲,则第一条消息填充buffer并将null赋值给缓冲区变量。新的缓冲区将在同步块之后初始化。如果存在竞争条件,当buffer为null时,由于代码原因,第二条消息将不会缓冲:
if (b == null) {
return;
}这是一个缺陷还是正确的缓冲区行为?我怎样才能避免这种情况?
发布于 2018-11-04 20:27:53
如果多个线程要调用onNext,请使用序列化主题
Subject<Message> messageObserver = PublishSubject.<Message>create().toSerialized();
messageObserver
.filter(t -> test(t))
.buffer(eventsSaveTimeSpanInSeconds, TimeUnit.SECONDS, eventsSaveCount)
.subscribe(messages -> saveToDB(messages));
// from any thread now
messageObserver.onNext(message);https://stackoverflow.com/questions/53022208
复制相似问题