首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Emitter.onComplete后缓冲流中的线程中断

Emitter.onComplete后缓冲流中的线程中断
EN

Stack Overflow用户
提问于 2020-03-06 15:30:30
回答 1查看 33关注 0票数 0

我有一个Flowable,类似于:

代码语言:javascript
复制
Flowable.<String>create(onSubscribe, BackpressureStrategy.DROP)
    .doOnSubscribe(sub -> {
        System.out.println("onSubscribe");
    })
    .onBackpressureDrop(sns2 -> LOG.warn("Backpressure, dropping " + Arrays.asList(sns2)))
    .buffer(1, TimeUnit.SECONDS)
    .doOnTerminate(() -> {
        System.out.println("onTerminate");
    })
    .onErrorReturn(error -> {
        System.out.println("Error, will cancel scan: " + error);
        throw new RuntimeException(error);
    })
    .subscribe(objs -> /* NIO work here */);

当我在源发射器上调用onComplete时,紧接着onNext调用,我在subscribe lambda中得到一个中断。我认为这是预期的行为:https://github.com/ReactiveX/RxJava/issues/3601。这似乎发生在订阅者从缓冲区处理下游对象时(我猜如果它是单线程的,就不会有问题)。

这是一个问题,因为我在这里执行NIO工作,对于在Thread.interrupt上发生的事情有非常具体的要求。这发生在缓冲区完成发送所有工作之前,因此有些objs没有被完全处理。

这与Scheduler有关吗?我应该使用IO一号吗?如何“保护”在订阅服务器中执行的工作?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-03-06 16:49:33

我应该在IO线程上注册我的订阅者:

代码语言:javascript
复制
    .subscribeOn(Schedulers.io(), false)
    .subscribe(objs -> /* NIO work here */);

我想我假设,由于上游的buffer,中断被放置在来自buffer的线程上,这也会被subscribeOn选择的任何线程所做。但情况似乎并非如此。

如果有人能提供更好的解释,我想读一读。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60567165

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档