谁知道如何将Firebase与RxJava连接起来,这样当我从数据库加载所有数据时,它就会运行arrayAdapter.notifyDataSetChanged() ??我正在考虑用onComplete()方法编写它,但它仍然在加载所有数据之前运行
Completable.fromCallable(new Callable<List<cards>>() {
@Override
public List<cards> call() throws Exception {
newUserDb.addListenerForSingleValueEvent(new ValueEventListener() {
@Override
public void onDataChange(@NonNull DataSnapshot dataSnapshot) {
if (dataSnapshot.child(currentUID).child("sex").exists()) {
myInfo.put("sex", dataSnapshot.child(currentUID).child("sex").getValue().toString());
}
if (dataSnapshot.child(currentUID).child("dateOfBirth").exists()) {
int myAge = stringDateToAge(dataSnapshot.child(currentUID).child("dateOfBirth").getValue().toString());
myInfo.put("age", String.valueOf(myAge));
}
if (dataSnapshot.child(currentUID).child("connections").child("yes").exists()) {
for (DataSnapshot ds : dataSnapshot.child(currentUID).child("connections").child("yes").getChildren()) {
if (!dataSnapshot.child(currentUID).child("connections").child("matches").hasChild(ds.getKey())) {
Log.d("rxJava", "onDataChange: " + ds.getKey());
first.add(ds.getKey());
getTagsPreferencesUsers(dataSnapshot.child(ds.getKey()), true);
}
}
}
}
@Override
public void onCancelled(@NonNull DatabaseError databaseError) {
}
});
return rowItems;
}
}).subscribeOn(Schedulers.computation())
.subscribe(new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
Log.d("rxJava", "Test RxJAVA, onSubscribe");
}
@Override
public void onComplete() {
Log.d("rxJava", "Test RxJAVA, onComplete");
}
@Override
public void onError(Throwable error) {
Log.d("rxJava", "Test RxJAVA, onError");
}
});输出结果是
2020-06-04 23:36:28.797 29515-29515/com.example.tinderapp D/rxJava: Test RxJAVA, onSubscribe
2020-06-04 23:36:28.800 29515-29612/com.example.tinderapp D/rxJava: Test RxJAVA, onComplete
2020-06-04 23:36:29.018 29515-29515/com.example.tinderapp D/rxJava: onDataChange: a4hqGgAJBRTVJOlPp3blNDt5v7q1
2020-06-04 23:36:29.022 29515-29515/com.example.tinderapp D/rxJava: onDataChange: aA9HAOtaB7ao6vzKqqBNp0iaBev2发布于 2020-06-05 16:52:23
我想说,这是预期行为,如下所述:
Completable.fromCallable
fromCallable接受一个Lambda,它在订阅时返回一个列表。在您的例子中,也打开了一个数据库连接,这基本上是失败的,因为回调是通过回调非阻塞注册的。
subscribeOn
这确保了从给定的调度器调用来自fromCallable的subscribeAcutal。因此,订阅线程和发出线程是解耦的。
首先获取onComplete,因为fromCallable将立即返回rowItems,并且数据库连接将保持打开状态,因为您没有删除侦听器。一段时间后,您将获得数据库回调日志,因为数据库连接仍然是打开的,侦听器仍然是注册的。
你想实际做一些类似这样的事情:
Single.create<List<Card>> { emitter ->
// register onChange callback to database
// callback will be called, when a value is available
// the Single will stay open, until emitter#onSuccess is called with a collected list.
newUserDb.addListenerForSingleValueEvent {
// do some stuff
emitter.onSuccess(listOf()) // return collected data from database here...
}
emitter.setCancellable {
// unregister addListenerForSingleValueEvent from newUserDb here
}
}.subscribeOn(Schedulers.computation())
.subscribe(
// stuff
)如果你想有一个恒定的更新流,用可观察的/流动的交换单个
https://stackoverflow.com/questions/62204649
复制相似问题