首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >rxjava 2.2.2,偶尔会出现死锁

rxjava 2.2.2,偶尔会出现死锁
EN

Stack Overflow用户
提问于 2021-07-28 17:13:22
回答 1查看 40关注 0票数 0

rxjava 2.2.2,转储线程中偶尔出现死锁

代码如下,这是一个问题吗?

代码语言:javascript
复制
Observable.fromCallable(() -> {
    Observable
        .intervalRange(1, num, 20, 100, TimeUnit.MILLISECONDS)
        .flatMap(i -> Observable
        .fromCallable(() -> service.getType(type))
        .subscribeOn(Schedulers.io()),false, 5)
        .observeOn(Schedulers.io())
        .buffer(3)
        .blockingSubscribe(types -> {
            // ...
        });
        return Result.success();
    }
).retryWhen(new RetryWithDelay(5, 2000))
    .observeOn(Schedulers.io(), true)
    .subscribeOn(Schedulers.io())
    .subscribe(result -> {
        
    }, throwable -> {
        
    });

线程转储如下所示

代码语言:javascript
复制
"RxCachedThreadScheduler-5234" #10684 daemon prio=5 os_prio=0 tid=0x00007f690c02f800 nid=0x29f8 waiting on condition [0x00007f68008ce000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000000c4b73b28> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at io.reactivex.internal.operators.observable.ObservableBlockingSubscribe.subscribe(ObservableBlockingSubscribe.java:56)
        at io.reactivex.internal.operators.observable.ObservableBlockingSubscribe.subscribe(ObservableBlockingSubscribe.java:103)
        at io.reactivex.Observable.blockingSubscribe(Observable.java:5414)
        at com.test.Test.lambda$test$5(Test.java:221)
        at com.test.Test$$Lambda$79/1184820793.call(Unknown Source)
        at io.reactivex.internal.operators.observable.ObservableFromCallable.subscribeActual(ObservableFromCallable.java:43)
        at io.reactivex.Observable.subscribe(Observable.java:12090)
        at io.reactivex.internal.operators.observable.ObservableRetryWhen$RepeatWhenObserver.subscribeNext(ObservableRetryWhen.java:150)
        at io.reactivex.internal.operators.observable.ObservableRetryWhen.subscribeActual(ObservableRetryWhen.java:60)

希望能得到答案,非常感谢

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-07-28 09:24:47

如果在RxJava中使用blockingX方法,可能会导致死锁。根据您的示例,根本没有理由使用blockingSubscribe。试着这样做:

代码语言:javascript
复制
Observable.defer(() -> {
    return Observable
        .intervalRange(1, num, 20, 100, TimeUnit.MILLISECONDS)
        .flatMap(i -> Observable.fromCallable(() -> service.getType(type))
                     .subscribeOn(Schedulers.io()),
              false, 5
        )
        .observeOn(Schedulers.io())
        .buffer(3)
        .doOnNext(types -> {
            // ...
        })
        .ignoreElements()
        .andThen(Observable.just(Result.success()));
    }
).retryWhen(new RetryWithDelay(5, 2000))
    .observeOn(Schedulers.io(), true)
    .subscribeOn(Schedulers.io())
    .subscribe(result -> {
        
    }, throwable -> {
        
    });
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68557559

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档