我开始使用BehaviorSubject类
因为我需要广播流
一个特殊的StreamController,它捕获已添加到控制器的最新项,并将其作为第一个项发送到任何新侦听器。
测试代码如下:
import 'package:rxdart/rxdart.dart';
void main() {
BehaviorSubject<int> subject = BehaviorSubject<int>.seeded(0);
subject.stream.listen((a) => print("listener 1 : $a"));
subject.add(1);
subject.add(2);
subject.stream.listen((a) => print("listener 2 : $a"));
subject.add(3);
subject.stream.listen((a) => print("listener 3 : $a"));
subject.add(4);
subject.close();
}结果:
listener 1 : 0
listener 1 : 1
listener 2 : 2
listener 2 : 3
listener 3 : 3
listener 3 : 4
listener 1 : 2
listener 2 : 4
listener 1 : 3
listener 1 : 4
Exited虽然我需要即时侦听流的存储值的行为,但结果顺序在我的代码上是如此不可预测。
我需要BehaviorSubject类的更多规范,但无法了解流为什么会这样运行。
我想知道这个API是如何工作的。
发布于 2022-07-06 09:51:13
BehaviorSubject并不是真正的广播流。它是一个多播流(多个侦听器),但它们并不像广播流那样同时得到相同的事件。(因为他们在听的时候得到了最新的消息)。
显示行为的最可能原因(我猜,我还没有检查BehaviorSubject实现,我只是碰巧很好地理解Dart流)是事件传递是异步的,事件被排队。
如果在print("done");之后添加subject.close();,它将在任何listener x: y行之前打印。这是因为在此之前,代码所做的所有工作都是调度微任务,以便稍后交付事件。
BehaviorSubject似乎安排了一个微任务来向每个侦听器传递第一个事件(可能直接调用onData ),然后在下一次值被add编辑到subject时再发送一个微任务,这是将其添加到侦听器的事件队列中的地方。之后,向队列中添加更多的事件不会触发更多的微任务。相反,队列元素将一个接一个地传递,在每个任务之后请求一个新的微任务。
这将给出以下行为(符号M1:L23意味着微任务M1计划将事件3交付给侦听器2):
+─────────────+───────────────────────────────────────────────────────────────────────────────+──────────────────+
| action | microtasks | prints |
+─────────────+───────────────────────────────────────────────────────────────────────────────+──────────────────+
| `listen` 1 | M1:L1[0] | |
| `add(1)` | M1:L1[0],M2:L1[1] | |
| `add(2)` | M1:L1[0],M2:L1[1, 2] | |
| `listen` 2 | M1:L1[0],M2:L1[1, 2],M3:L2[2] | |
| `add(3)` | M1:L1[0],M2:L1[1,2,3],M3:L2[2],M4:L2[3] | |
| `listen` 3 | M1:L1[0],M2:L1[1,2,3],M3:L2[2],M4:L2[3],M5:L3[3] | |
| `add(4)` | M1:L1[0],M2:L1[1,2,3,4],M3:L2[2],M4:L2[3,4],M5:L3[3],M6:L3[4] | |
| `close` | M1:L1[0],M2:L1[1,2,3,4,DONE],M3:L2[2],M4:L2[3,4,DONE],M5:L3[3],M6:L3[4,DONE] | |
| microtask | M2:L1[1,2,3,4,DONE],M3:L2[2],M4:L2[3,4,DONE],M5:L3[3],M6:L3[4,DONE] | `listener 1: 0` |
| microtask | M3:L2[2],M4:L2[3,4,DONE],M5:L3[3],M6:L3[4,DONE],M7:L1[2,3,4,DONE] | `listener 1: 1` |
| microtask | M4:L2[3,4,DONE],M5:L3[3],M6:L3[4,DONE],M7:L1[2,3,4,DONE] | `listener 2: 2` |
| microtask | M5:L3[3],M6:L3[4,DONE],M7:L1[2,3,4,DONE],M8:L2[4,DONE], | `listener 2: 3` |
| microtask | M6:L3[4,DONE],M7:L1[2,3,4,DONE],M8:L2[4,DONE], | `listener 3: 3` |
| microtask | M7:L1[2,3,4,DONE],M8:L2[4,DONE],M9:L3[DONE] | `listener 3: 4` |
| microtask | M8:L2[4,DONE],M9:L3[DONE],M10:L1[3,4,DONE] | `listener 1: 2` |
| microtask | M9:L3[DONE],M10:L1[3,4,DONE],M11:L2[DONE] | `listener 2: 4` |
| microtask | M10:L1[3,4,DONE],M11:L2[DONE] | |
| microtask | M11:L2[DONE],M12:L1[4,DONE] | `listener 1: 3 |
| microtask | M12:L1[4,DONE] | |
| microtask | M13:L1[DONE] | `listener 1: 4 |
| microtask | | |
+─────────────+───────────────────────────────────────────────────────────────────────────────+──────────────────+您可以看到这与您的结果相匹配。所以,事情的运作方式取决于它们的实现方式,因为它依赖于一个事件队列,它在交付每个事件后将自己放在microtask队列的后面,所以最长的队列将最后完成。
在调度和运行微任务时,您可以通过打印来探索这种行为:
import 'dart:async';
import 'package:rxdart/rxdart.dart';
void main() {
int ctr = 0;
runZoned(() {
BehaviorSubject<int> subject = BehaviorSubject<int>.seeded(0);
print("listen 1");
subject.stream.listen((a) => print("listener 1 : $a"), onDone: () {
print('listener 1 done');
});
print("add 1");
subject.add(1);
print("add 2");
subject.add(2);
print("listen 2");
subject.stream.listen((a) => print("listener 2 : $a"), onDone: () {
print('listener 2 done');
});
print("add 3");
subject.add(3);
print("listen 3");
subject.stream.listen((a) => print("listener 3 : $a"), onDone: () {
print('listener 3 done');
});
print("add 4");
subject.add(4);
print("close");
subject.close();
print("done");
}, zoneSpecification: ZoneSpecification(scheduleMicrotask: (s, p, z, f) {
var id = ++ctr;
print("Schedule microtask $id");
p.scheduleMicrotask(z, () {
print("Run microtask $id");
f();
});
}));
}其中的指纹:
listen 1
Schedule microtask 1
add 1
Schedule microtask 2
add 2
listen 2
Schedule microtask 3
add 3
Schedule microtask 4
listen 3
Schedule microtask 5
add 4
Schedule microtask 6
close
done
Run microtask 1
listener 1 : 0
Run microtask 2
listener 1 : 1
Schedule microtask 7
Run microtask 3
listener 2 : 2
Run microtask 4
listener 2 : 3
Schedule microtask 8
Run microtask 5
listener 3 : 3
Run microtask 6
listener 3 : 4
Schedule microtask 9
Run microtask 7
listener 1 : 2
Schedule microtask 10
Run microtask 8
listener 2 : 4
Schedule microtask 11
Run microtask 9
Run microtask 10
listener 1 : 3
Schedule microtask 12
Run microtask 11
listener 3 done
listener 2 done
Run microtask 12
listener 1 : 4
Schedule microtask 13
Run microtask 13
Schedule microtask 14
Run microtask 14
listener 1 done(“完成”延迟了,可能是因为他们需要等待另一个未来。)
https://stackoverflow.com/questions/72880092
复制相似问题