首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >rx java缓冲区正在丢失项目

rx java缓冲区正在丢失项目
EN

Stack Overflow用户
提问于 2018-10-27 21:11:52
回答 1查看 125关注 0票数 0

我有一个采用这种配置的PublishSubject:

代码语言:javascript
复制
PublishSubject<Message> messageObserver = 
    messageObserver
    .filter(t -> test(t))
    .buffer(eventsSaveTimeSpanInSeconds, TimeUnit.SECONDS, eventsSaveCount)
    .subscribe(messages -> saveToDB(messages));

我的应用程序的不同线程正在通过onNext()向这个PublishSubject写入消息。

正如我所看到的,ObservableBufferTimed.BufferExactBoundedObserver底层的buffer是非线程安全的,因为它的onNext看起来如下所示:

代码语言:javascript
复制
public void onNext(T t) {
            U b;
            synchronized (this) {
                b = buffer;
                if (b == null) {
                    return;
                }

                b.add(t);

                if (b.size() < maxSize) {
                    return;
                }
                buffer = null;
                producerIndex++;
            }

            if (restartTimerOnMaxSize) {
                timer.dispose();
            }

            fastPathOrderedEmit(b, false, this);

            try {
                b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The buffer supplied is null");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                actual.onError(e);
                dispose();
                return;
            }

            synchronized (this) {
                buffer = b;
                consumerIndex++;
            }
            if (restartTimerOnMaxSize) {
                timer = w.schedulePeriodically(this, timespan, timespan, unit);
            }
        }

为了使竞争条件更加明显,我将eventsSaveTimeSpanInSecondseventsSaveCount参数设置为1 (1秒内发生1个事件)。

问题出现在此块中:

代码语言:javascript
复制
synchronized (this) {
                b = buffer;
                if (b == null) {
                    return;
                }

                b.add(t);

                if (b.size() < maxSize) {
                    return;
                }
                buffer = null;
                producerIndex++;
            }

因此,如果两条消息同时缓冲,则第一条消息填充buffer并将null赋值给缓冲区变量。新的缓冲区将在同步块之后初始化。如果存在竞争条件,当buffer为null时,由于代码原因,第二条消息将不会缓冲:

代码语言:javascript
复制
if (b == null) {
  return;
}

这是一个缺陷还是正确的缓冲区行为?我怎样才能避免这种情况?

EN

回答 1

Stack Overflow用户

发布于 2018-11-04 20:27:53

如果多个线程要调用onNext,请使用序列化主题

代码语言:javascript
复制
Subject<Message> messageObserver = PublishSubject.<Message>create().toSerialized();

messageObserver
.filter(t -> test(t))
.buffer(eventsSaveTimeSpanInSeconds, TimeUnit.SECONDS, eventsSaveCount)
.subscribe(messages -> saveToDB(messages));

// from any thread now
messageObserver.onNext(message);
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53022208

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档