我有一个Flowable,类似于:
Flowable.<String>create(onSubscribe, BackpressureStrategy.DROP)
.doOnSubscribe(sub -> {
System.out.println("onSubscribe");
})
.onBackpressureDrop(sns2 -> LOG.warn("Backpressure, dropping " + Arrays.asList(sns2)))
.buffer(1, TimeUnit.SECONDS)
.doOnTerminate(() -> {
System.out.println("onTerminate");
})
.onErrorReturn(error -> {
System.out.println("Error, will cancel scan: " + error);
throw new RuntimeException(error);
})
.subscribe(objs -> /* NIO work here */);当我在源发射器上调用onComplete时,紧接着onNext调用,我在subscribe lambda中得到一个中断。我认为这是预期的行为:https://github.com/ReactiveX/RxJava/issues/3601。这似乎发生在订阅者从缓冲区处理下游对象时(我猜如果它是单线程的,就不会有问题)。
这是一个问题,因为我在这里执行NIO工作,对于在Thread.interrupt上发生的事情有非常具体的要求。这发生在缓冲区完成发送所有工作之前,因此有些objs没有被完全处理。
这与Scheduler有关吗?我应该使用IO一号吗?如何“保护”在订阅服务器中执行的工作?
发布于 2020-03-06 16:49:33
我应该在IO线程上注册我的订阅者:
.subscribeOn(Schedulers.io(), false)
.subscribe(objs -> /* NIO work here */);我想我假设,由于上游的buffer,中断被放置在来自buffer的线程上,这也会被subscribeOn选择的任何线程所做。但情况似乎并非如此。
如果有人能提供更好的解释,我想读一读。
https://stackoverflow.com/questions/60567165
复制相似问题